diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/StringSelector.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/StringSelector.java new file mode 100644 index 0000000000..f92f4ec59c --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/StringSelector.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +/** + * Fluent api for checking one or more strings and selecting the first non-empty one. + *

+ * {@link #toString()} returns the first encountered non-empty string or "". + *

+ * Optimized so that no intermediary objects are created, only one, once the first non-empty string is found. + */ +public interface StringSelector { + /** + * Starts the fluent expression by checking the first string(s). + * + * @param strings The first string(s) to check if empty. + * @return a {@link StringSelector} that checked the first string. + */ + static StringSelector of(String... strings) { + return EMPTY_STRING_SELECTOR.or(strings); + } + + /** + * Check the next string(s). + * + * @param strings The next string(s) to check if empty. + * @return a {@link StringSelector} that checked all strings so far. + */ + StringSelector or(String... strings); + + /** + * May be used to stop processing subsequent inputs when a result is already available. + * + * @return true if a non-empty string has been found, false otherwise. + */ + boolean found(); + + StringSelector EMPTY_STRING_SELECTOR = new StringSelector() { + @Override + public String toString() { + return ""; + } + + @Override + public StringSelector or(String... strings) { + for (String string : strings) { + if (string != null && string.length() > 0) { + return new StringSelector() { + @Override + public StringSelector or(String... string) { + return this; + } + + @Override + public String toString() { + return string; + } + + @Override + public boolean found() { + return true; + } + }; + } + } + + return EMPTY_STRING_SELECTOR; + } + + @Override + public boolean found() { + return false; + } + }; +} diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/StringSelectorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/StringSelectorTest.java new file mode 100644 index 0000000000..06158f40b5 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/StringSelectorTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class StringSelectorTest { + @Test + public void testNull() { + test("", false, (String) null); + } + + @Test + public void testEmpty() { + test("", false, ""); + } + + @Test + public void testNull_Empty() { + test("", false, null, ""); + } + + @Test + public void testNull_Empty_NonEmpty() { + test("expected", true, null, "", "expected"); + } + + @Test + public void testNonEmpty_Null_NonEmpty() { + test("expected", true, "expected", null, "other"); + } + + @Test + public void testNonEmpty_Empty_NonEmpty() { + test("expected", true, "expected", "", "other"); + } + + @Test + public void testTwoNonEmpty() { + test("expected", true, "expected", "other"); + } + + @Test + public void testManyInputsWithNoExpected() { + test( + "", + false, + new String[]{null, "", "", ""}, + new String[]{null, null, ""}, + new String[]{null, "", null} + ); + } + + @Test + public void testManyInputsWithExpectedInFirstBatch() { + test( + "expected", + true, + new String[]{null, "expected", "", ""}, + new String[]{null, null, ""}, + new String[]{null, "other", "andAnother"} + ); + } + + @Test + public void testManyInputsWithExpectedInLaterBatch() { + test( + "expected", + true, + new String[]{null, "", "", ""}, + new String[]{null, null, "expected"}, + new String[]{null, "other", "andAnother"} + ); + } + + public void test(String expected, boolean expectedFound, String... inputs) { + // GIVEN + + // WHEN + StringSelector selector = StringSelector.of(inputs); + + // THEN + boolean actualFound = selector.found(); + String actual = selector.toString(); + + assertEquals(expected, actual); + assertEquals(expectedFound, actualFound); + } + + public void test(String expected, boolean expectedFound, String[] firstInputs, String[]... otherInputs) { + // GIVEN + + // WHEN + StringSelector selector = StringSelector.of(firstInputs); + for (String[] otherInput : otherInputs) { + selector = selector.or(otherInput); + + if (selector.found()) { + assertEquals(expected, selector.toString()); + } else { + assertEquals("", selector.toString()); + } + } + + // THEN + boolean actualFound = selector.found(); + String actual = selector.toString(); + + assertEquals(expected, actual); + assertEquals(expectedFound, actualFound); + + } +} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java index 226bfa162a..2e5517544d 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java @@ -36,15 +36,15 @@ public class AtlasUtils { return guid != null && !guid.startsWith("-"); } - public static String toQualifiedName(String clusterName, String componentId) { - return componentId + "@" + clusterName; + public static String toQualifiedName(String namespace, String componentId) { + return componentId + "@" + namespace; } public static String getComponentIdFromQualifiedName(String qualifiedName) { return qualifiedName.split("@")[0]; } - public static String getClusterNameFromQualifiedName(String qualifiedName) { + public static String getNamespaceFromQualifiedName(String qualifiedName) { return qualifiedName.split("@")[1]; } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java index e40e034451..3436ff346d 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java @@ -178,13 +178,13 @@ public class NiFiAtlasClient { /** * Fetch existing NiFiFlow entity from Atlas. * @param rootProcessGroupId The id of a NiFi flow root process group. - * @param clusterName The cluster name of a flow. + * @param namespace The namespace of a flow. * @return A NiFiFlow instance filled with retrieved data from Atlas. Status objects are left blank, e.g. ProcessorStatus. * @throws AtlasServiceException Thrown if requesting to Atlas API failed, including when the flow is not found. */ - public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String clusterName) throws AtlasServiceException { + public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String namespace) throws AtlasServiceException { - final String qualifiedName = AtlasUtils.toQualifiedName(clusterName, rootProcessGroupId); + final String qualifiedName = AtlasUtils.toQualifiedName(namespace, rootProcessGroupId); final AtlasObjectId flowId = new AtlasObjectId(TYPE_NIFI_FLOW, ATTR_QUALIFIED_NAME, qualifiedName); final AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt = searchEntityDef(flowId); @@ -198,7 +198,7 @@ public class NiFiAtlasClient { final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId); nifiFlow.setExEntity(nifiFlowEntity); nifiFlow.setFlowName(toStr(attributes.get(ATTR_NAME))); - nifiFlow.setClusterName(clusterName); + nifiFlow.setNamespace(namespace); nifiFlow.setUrl(toStr(attributes.get(ATTR_URL))); nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION))); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java index c3920c8693..98e686b094 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java @@ -45,7 +45,6 @@ import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; import static org.apache.nifi.atlas.AtlasUtils.isGuidAssigned; import static org.apache.nifi.atlas.AtlasUtils.isUpdated; import static org.apache.nifi.atlas.AtlasUtils.updateMetadata; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME; import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; @@ -65,7 +64,7 @@ public class NiFiFlow { private final String rootProcessGroupId; private String flowName; - private String clusterName; + private String namespace; private String url; private String atlasGuid; private AtlasEntity exEntity; @@ -112,13 +111,13 @@ public class NiFiFlow { return rootProcessGroupId; } - public String getClusterName() { - return clusterName; + public String getNamespace() { + return namespace; } - public void setClusterName(String clusterName) { - updateMetadata(metadataUpdated, updateAudit, ATTR_CLUSTER_NAME, this.clusterName, clusterName); - this.clusterName = clusterName; + public void setNamespace(String namespace) { + updateMetadata(metadataUpdated, updateAudit, "namespace", this.namespace, namespace); + this.namespace = namespace; atlasObjectId = createAtlasObjectId(); } @@ -370,7 +369,7 @@ public class NiFiFlow { } public String toQualifiedName(String componentId) { - return AtlasUtils.toQualifiedName(clusterName, componentId); + return AtlasUtils.toQualifiedName(namespace, componentId); } public enum EntityChangeType { diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java index c0071f032c..4562139127 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.atlas.provenance; -import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.NamespaceResolver; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.lineage.ComputeLineageResult; @@ -24,8 +24,8 @@ import org.apache.nifi.provenance.lineage.ComputeLineageResult; import java.util.List; public interface AnalysisContext { - String getNiFiClusterName(); - ClusterResolver getClusterResolver(); + String getNiFiNamespace(); + NamespaceResolver getNamespaceResolver(); List findConnectionTo(String componentId); List findConnectionFrom(String componentId); ComputeLineageResult queryLineage(long eventId); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java index 1f761262da..3ca9ff1d91 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java @@ -17,7 +17,7 @@ package org.apache.nifi.atlas.provenance; import org.apache.nifi.atlas.NiFiFlow; -import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.NamespaceResolver; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceRepository; @@ -34,13 +34,13 @@ public class StandardAnalysisContext implements AnalysisContext { private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); private final NiFiFlow nifiFlow; - private final ClusterResolver clusterResolver; + private final NamespaceResolver namespaceResolver; private final ProvenanceRepository provenanceRepository; - public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver namespaceResolver, ProvenanceRepository provenanceRepository) { this.nifiFlow = nifiFlow; - this.clusterResolver = clusterResolver; + this.namespaceResolver = namespaceResolver; this.provenanceRepository = provenanceRepository; } @@ -55,13 +55,13 @@ public class StandardAnalysisContext implements AnalysisContext { } @Override - public String getNiFiClusterName() { - return nifiFlow.getClusterName(); + public String getNiFiNamespace() { + return nifiFlow.getNamespace(); } @Override - public ClusterResolver getClusterResolver() { - return clusterResolver; + public NamespaceResolver getNamespaceResolver() { + return namespaceResolver; } private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java index 3b3b105c2c..24514175ed 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java @@ -32,19 +32,21 @@ public abstract class AbstractHiveAnalyzer extends AbstractNiFiProvenanceEventAn static final String TYPE_TABLE = "hive_table"; static final String ATTR_DB = "db"; - protected Referenceable createDatabaseRef(String clusterName, String databaseName) { + protected Referenceable createDatabaseRef(String namespace, String databaseName) { final Referenceable ref = new Referenceable(TYPE_DATABASE); ref.set(ATTR_NAME, databaseName); - ref.set(ATTR_CLUSTER_NAME, clusterName); - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, databaseName)); + // The attribute 'clusterName' is in the 'hive_db' Atlas entity so it cannot be changed. + // Using 'namespace' as value for lack of better solution. + ref.set(ATTR_CLUSTER_NAME, namespace); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, databaseName)); return ref; } - protected Referenceable createTableRef(String clusterName, Tuple tableName) { + protected Referenceable createTableRef(String namespace, Tuple tableName) { final Referenceable ref = new Referenceable(TYPE_TABLE); ref.set(ATTR_NAME, tableName.getValue()); - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, toTableNameStr(tableName))); - ref.set(ATTR_DB, createDatabaseRef(clusterName, tableName.getKey())); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, toTableNameStr(tableName))); + ref.set(ATTR_DB, createDatabaseRef(namespace, tableName.getKey())); return ref; } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java index 7a1a589ef7..59cb2d91ac 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java @@ -49,12 +49,12 @@ public class FilePath extends AbstractNiFiProvenanceEventAnalyzer { public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { final Referenceable ref = new Referenceable(TYPE); final URI uri = parseUri(event.getTransitUri()); - final String clusterName; + final String namespace; try { // use hostname in uri if available for remote path. final String uriHost = uri.getHost(); final String hostname = StringUtils.isEmpty(uriHost) ? InetAddress.getLocalHost().getHostName() : uriHost; - clusterName = context.getClusterResolver().fromHostNames(hostname); + namespace = context.getNamespaceResolver().fromHostNames(hostname); } catch (UnknownHostException e) { logger.warn("Failed to get localhost name due to " + e, e); return null; @@ -63,7 +63,7 @@ public class FilePath extends AbstractNiFiProvenanceEventAnalyzer { final String path = uri.getPath(); ref.set(ATTR_NAME, path); ref.set(ATTR_PATH, path); - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path)); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path)); return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java index c9e74008bd..e2ff8fb2c5 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java @@ -34,7 +34,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI; /** * Analyze a transit URI as a HBase table. - *

  • qualifiedName=tableName@clusterName (example: myTable@cl1) + *
  • qualifiedName=tableName@namespace (example: myTable@ns1) *
  • name=tableName (example: myTable) */ public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer { @@ -57,11 +57,11 @@ public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer { final Referenceable ref = new Referenceable(TYPE); final String[] hostNames = splitHostNames(uriMatcher.group(1)); - final String clusterName = context.getClusterResolver().fromHostNames(hostNames); + final String namespace = context.getNamespaceResolver().fromHostNames(hostNames); final String tableName = uriMatcher.group(2); ref.set(ATTR_NAME, tableName); - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, tableName)); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, tableName)); // TODO: 'uri' is a mandatory attribute, but what should we set? ref.set(ATTR_URI, transitUri); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java index 5cf77aba39..6b5cf0b866 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java @@ -32,7 +32,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; /** * Analyze a transit URI as a HDFS path. - *
  • qualifiedName=/path/fileName@clusterName (example: /app/warehouse/hive/db/default@cl1) + *
  • qualifiedName=/path/fileName@namespace (example: /app/warehouse/hive/db/default@ns1) *
  • name=/path/fileName (example: /app/warehouse/hive/db/default) */ public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer { @@ -43,12 +43,14 @@ public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer { public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { final Referenceable ref = new Referenceable(TYPE); final URI uri = parseUri(event.getTransitUri()); - final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost()); + final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost()); final String path = uri.getPath(); ref.set(ATTR_NAME, path); ref.set(ATTR_PATH, path); - ref.set(ATTR_CLUSTER_NAME, clusterName); - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path)); + // The attribute 'clusterName' is in the 'hdfs_path' Atlas entity so it cannot be changed. + // Using 'namespace' as value for lack of better solution. + ref.set(ATTR_CLUSTER_NAME, namespace); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path)); return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java index 7821f866df..51babe8d46 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java @@ -39,13 +39,13 @@ import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.par * -

    Cluster Name Resolution

    +

    Namespaces

    An entity in Atlas can be identified by its GUID for any existing objects, or type name and unique attribute can be used if GUID is not known. Qualified name is commonly used as the unique attribute.

    -

    Since one Atlas instance can be used to manage multiple environments, i.e clusters, Atlas has to manage objects in different clusters those may have the same name. For example, a Hive table 'request_logs' in a 'cluster-A' and 'cluster-B'. In such case, cluster name embedded in qualified names are crucial.

    +

    One Atlas instance can be used to manage multiple environments and objects in different environments may have the same name. For example, a Hive table 'request_logs' in two different clusters, 'cluster-A' and 'cluster-B'. For this reason the qualified names contain a so-called metadata namespace.

    +

    It's common practice to provide the cluster name as the namespace, but it can be any arbitrary string.

    -

    For these requirements, a qualified name has 'componentId@clusterName' format. E.g. A Hive table qualified name would be dbName.tableName@clusterName (default.request_logs@cluster-A).

    +

    With this, a qualified name has 'componentId@namespace' format. E.g. A Hive table qualified name would be dbName.tableName@namespace (default.request_logs@cluster-A).

    -

    From this NiFi reporting task standpoint, a cluster name is need to be resolved at following situations: +

    From this NiFi reporting task standpoint, a namespace is needed to be resolved at following situations:

    -

    To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from ip address or hostname to a cluster name. - The mapping can be defined by Dynamic Properties with a name in 'hostnamePattern.ClusterName' format, having its value as a set of Regular Expression Patterns to match ip addresses or host names to a particular cluster name.

    +

    To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from IP address or hostname to a namespace. + The mapping can be defined by Dynamic Properties with a name in 'hostnamePattern.namespace' format, having its value as a set of Regular Expression Patterns to match IP addresses or host names to a particular namespace.

    -

    As an example, following mapping definition would resolve cluster name 'cluster-A' for ip address such as '192.168.30.123' or hostname 'namenode1.a.example.com', and 'cluster-B' for '192.168.40.223' or 'nifi3.b.example.com'.

    +

    As an example, following mapping definition would resolve namespace 'namespace-A' for IP address such as '192.168.30.123' or hostname 'namenode1.a.example.com', and 'namespace-B' for '192.168.40.223' or 'nifi3.b.example.com'.

    -# Dynamic Property Name for cluster-A
    -hostnamePattern.cluster-A
    +# Dynamic Property Name for namespace-A
    +hostnamePattern.namespace-A
     # Value can have multiple Regular Expression patterns separated by new line
     192\.168\.30\.\d+
     [^\.]+\.a\.example\.com
     
    -# Dynamic Property Name for cluster-B
    -hostnamePattern.cluster-B
    +# Dynamic Property Name for namespace-B
    +hostnamePattern.namespace-B
     # Values
     192\.168\.40\.\d+
     [^\.]+\.b\.example\.com
             
    -

    If any cluster name mapping does not match, then a name defined at 'Atlas Default Cluster Name' is used.

    +

    If no namespace mapping matches, then a name defined at 'Atlas Default Metadata Namespace' is used.

    NiFi flow structure

    @@ -271,11 +272,11 @@ Processor 3

    To identify such Process and DataSet Atlas entities, this reporting task uses NiFi Provenance Events. At least, the reporting task needs to derive following information from a NiFi Provenance event record:

    -

    'clusterName' in 'qualifiedName' attribute is resolved by mapping ip-address or hostname available at NiFi Provenance event 'transitUri' to a cluster name. See Cluster Name Resolution for detail.

    +

    'namespace' in 'qualifiedName' attribute is resolved by mapping ip-address or hostname available at NiFi Provenance event 'transitUri' to a namespace. See Namespaces for detail.

    For 'typeName' and 'qualifiedName', different analysis rules are needed for different DataSet. ReportLineageToAtlas provides an extension point called 'NiFiProvenanceEventAnalyzer' to implement such analysis logic for particular DataSets.

    @@ -327,8 +328,8 @@ Processor 3 nifi_input_port - rootGroupPortGUID@clusterName - (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1) + rootGroupPortGUID@namespace + (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1) @@ -343,7 +344,7 @@ upstream (nifi_flow_path) (nifi_input_port) - remoteInputPortGUID@clusterName
    (e.g. f31a6b53-3077-4c59-144c-a8d71255027d@cl1) + remoteInputPortGUID@namespace
    (e.g. f31a6b53-3077-4c59-144c-a8d71255027d@ns1)

    NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.

    @@ -363,8 +364,8 @@ upstream (nifi_flow_path) nifi_output_port - rootGroupPortGUID@clusterName - (e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@cl1) + rootGroupPortGUID@namespace + (e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@ns1) @@ -382,9 +383,9 @@ remote target port @@ -409,7 +410,7 @@ remote target port nifi_input_port
    nifi_output_port - rootGroupPortGUID@clusterName
    (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1) + rootGroupPortGUID@namespace
    (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1) @@ -435,7 +436,7 @@ remote target port (Protocol can be either PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL) kafka_topic - topicName@clusterName
    (e.g. testTopic@cl1) + topicName@namespace
    (e.g. testTopic@ns1) NOTE:With Atlas earlier than 0.8.2, the same topic name in different clusters can not be created using the pre-built 'kafka_topic'. See ATLAS-2286. @@ -444,7 +445,7 @@ remote target port SEND thrift://hive.example.com:9083 hive_table - tableName@clusterName
    (e.g. myTable@cl1) + tableName@namespace
    (e.g. myTable@ns1) @@ -460,7 +461,7 @@ remote target port jdbc:hive2://hive.example.com:10000/default hive_table - tableName@clusterName
    (e.g. myTable@cl1) + tableName@namespace
    (e.g. myTable@ns1) The corresponding Processors parse Hive QL to set 'query.input.tables' and 'query.output.tables' FlowFile attributes. These attribute values are used to create qualified name. @@ -485,7 +486,7 @@ remote target port hdfs://nn.example.com:8020/user/nifi/5262553828219 hdfs_path - /path/fileName@clusterName
    (e.g. /app/warehouse/hive/db/default@cl1) + /path/fileName@namespace
    (e.g. /app/warehouse/hive/db/default@ns1) @@ -508,7 +509,7 @@ remote target port hbase://hmaster.example.com:16000/tableA/rowX hbase_table - tableName@clusterName
    (e.g. myTable@cl1) + tableName@namespace
    (e.g. myTable@ns1) @@ -541,7 +542,7 @@ remote target port nifi_data - processorGuid@clusterName
    db8bb12c-5cd3-3011-c971-579f460ebedf@cl1 + processorGuid@namespace
    db8bb12c-5cd3-3011-c971-579f460ebedf@ns1 diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/TestNiFiFlowAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/TestNiFiFlowAnalyzer.java index 64624e93e6..33efe45b24 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/TestNiFiFlowAnalyzer.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/TestNiFiFlowAnalyzer.java @@ -63,10 +63,10 @@ public class TestNiFiFlowAnalyzer { final NiFiFlowAnalyzer analyzer = new NiFiFlowAnalyzer(); final NiFiFlow nifiFlow = new NiFiFlow(rootPG.getId()); - nifiFlow.setClusterName("cluster1"); + nifiFlow.setNamespace("namespace1"); analyzer.analyzeProcessGroup(nifiFlow, rootPG); - assertEquals("1234-5678-0000-0000@cluster1", nifiFlow.getQualifiedName()); + assertEquals("1234-5678-0000-0000@namespace1", nifiFlow.getQualifiedName()); } private ProcessorStatus createProcessor(ProcessGroupStatus pgStatus, String type) { @@ -237,7 +237,7 @@ public class TestNiFiFlowAnalyzer { final NiFiFlowAnalyzer analyzer = new NiFiFlowAnalyzer(); final NiFiFlow nifiFlow = new NiFiFlow(rootPG.getId()); - nifiFlow.setClusterName("cluster1"); + nifiFlow.setNamespace("namespace1"); analyzer.analyzeProcessGroup(nifiFlow, rootPG); assertEquals(4, nifiFlow.getProcessors().size()); @@ -259,7 +259,7 @@ public class TestNiFiFlowAnalyzer { assertEquals(1, pathC.getInputs().size()); final AtlasObjectId queue = pathC.getInputs().iterator().next(); assertEquals(TYPE_NIFI_QUEUE, queue.getTypeName()); - assertEquals(toQualifiedName("cluster1", pathC.getId()), queue.getUniqueAttributes().get(ATTR_QUALIFIED_NAME)); + assertEquals(toQualifiedName("namespace1", pathC.getId()), queue.getUniqueAttributes().get(ATTR_QUALIFIED_NAME)); // Should be able to find a path from a given processor GUID. final NiFiFlowPath pathForPr0 = nifiFlow.findPath(pr0.getId()); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java index 5217e742e2..5d7296c56e 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java @@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.junit.Test; @@ -45,11 +45,11 @@ public class TestHBaseTable { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -60,7 +60,7 @@ public class TestHBaseTable { Referenceable ref = refs.getInputs().iterator().next(); assertEquals("hbase_table", ref.getTypeName()); assertEquals("tableA", ref.get(ATTR_NAME)); - assertEquals("tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -72,14 +72,14 @@ public class TestHBaseTable { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames( + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames( matches("zk0.example.com"), matches("zk2.example.com"), - matches("zk3.example.com"))).thenReturn("cluster1"); + matches("zk3.example.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -90,7 +90,7 @@ public class TestHBaseTable { Referenceable ref = refs.getInputs().iterator().next(); assertEquals("hbase_table", ref.getTypeName()); assertEquals("tableA", ref.get(ATTR_NAME)); - assertEquals("tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java index 3d94a33b32..6edc09fda2 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java @@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.junit.Test; @@ -46,11 +46,11 @@ public class TestHDFSPath { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -61,6 +61,6 @@ public class TestHDFSPath { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hdfs_path", ref.getTypeName()); assertEquals("/user/nifi/fileA", ref.get(ATTR_NAME)); - assertEquals("/user/nifi/fileA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("/user/nifi/fileA@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java index e7e6a91519..82f4e1e294 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java @@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.junit.Test; @@ -56,11 +56,11 @@ public class TestHive2JDBC { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -71,7 +71,7 @@ public class TestHive2JDBC { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_db", ref.getTypeName()); assertEquals("database_a", ref.get(ATTR_NAME)); - assertEquals("database_a@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_a@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -90,11 +90,11 @@ public class TestHive2JDBC { when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -103,8 +103,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("database_a.table_a1@cluster1", "table_a1"); - expectedInputRefs.put("database_a.table_a2@cluster1", "table_a2"); + expectedInputRefs.put("database_a.table_a1@namespace1", "table_a1"); + expectedInputRefs.put("database_a.table_a2@namespace1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -115,7 +115,7 @@ public class TestHive2JDBC { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); assertEquals("table_b1", ref.get(ATTR_NAME)); - assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -134,11 +134,11 @@ public class TestHive2JDBC { when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -147,8 +147,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("default.table_a1@cluster1", "table_a1"); - expectedInputRefs.put("default.table_a2@cluster1", "table_a2"); + expectedInputRefs.put("default.table_a1@namespace1", "table_a1"); + expectedInputRefs.put("default.table_a2@namespace1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -159,7 +159,7 @@ public class TestHive2JDBC { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); assertEquals("table_b1", ref.get(ATTR_NAME)); - assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -177,11 +177,11 @@ public class TestHive2JDBC { when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -190,8 +190,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("default.table_a1@cluster1", "table_a1"); - expectedInputRefs.put("default.table_a2@cluster1", "table_a2"); + expectedInputRefs.put("default.table_a1@namespace1", "table_a1"); + expectedInputRefs.put("default.table_a2@namespace1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -202,7 +202,7 @@ public class TestHive2JDBC { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); assertEquals("table_b1", ref.get(ATTR_NAME)); - assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -222,11 +222,11 @@ public class TestHive2JDBC { when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -235,8 +235,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("default.table_a1@cluster1", "table_a1"); - expectedInputRefs.put("default.table_a2@cluster1", "table_a2"); + expectedInputRefs.put("default.table_a1@namespace1", "table_a1"); + expectedInputRefs.put("default.table_a2@namespace1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -247,7 +247,7 @@ public class TestHive2JDBC { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); assertEquals("table_b1", ref.get(ATTR_NAME)); - assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -265,11 +265,11 @@ public class TestHive2JDBC { when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -278,8 +278,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("some_database.table_a1@cluster1", "table_a1"); - expectedInputRefs.put("some_database.table_a2@cluster1", "table_a2"); + expectedInputRefs.put("some_database.table_a1@namespace1", "table_a1"); + expectedInputRefs.put("some_database.table_a2@namespace1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -290,7 +290,7 @@ public class TestHive2JDBC { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); assertEquals("table_b1", ref.get(ATTR_NAME)); - assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java index b8a68fbbe5..1b16c8b861 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java @@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.junit.Test; @@ -46,11 +46,11 @@ public class TestKafkaTopic { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -61,7 +61,7 @@ public class TestKafkaTopic { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("topicA", ref.get(ATTR_NAME)); assertEquals("topicA", ref.get("topic")); - assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -73,11 +73,11 @@ public class TestKafkaTopic { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -88,7 +88,7 @@ public class TestKafkaTopic { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("topicA", ref.get(ATTR_NAME)); assertEquals("topicA", ref.get("topic")); - assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -100,11 +100,11 @@ public class TestKafkaTopic { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -116,7 +116,7 @@ public class TestKafkaTopic { assertEquals("kafka_topic", ref.getTypeName()); assertEquals("topicA", ref.get(ATTR_NAME)); assertEquals("topicA", ref.get("topic")); - assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -128,11 +128,11 @@ public class TestKafkaTopic { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -144,7 +144,7 @@ public class TestKafkaTopic { assertEquals("kafka_topic", ref.getTypeName()); assertEquals("topicA", ref.get(ATTR_NAME)); assertEquals("topicA", ref.get("topic")); - assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java index e4799b36d3..71b02841e4 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java @@ -22,7 +22,7 @@ import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -60,8 +60,8 @@ public class TestNiFiRemotePort { when(sendEvent.getTransitUri()).thenReturn(transitUri); when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -70,7 +70,7 @@ public class TestNiFiRemotePort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); @@ -86,7 +86,7 @@ public class TestNiFiRemotePort { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); assertEquals("inputPortA", ref.get(ATTR_NAME)); - assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -99,8 +99,8 @@ public class TestNiFiRemotePort { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -109,7 +109,7 @@ public class TestNiFiRemotePort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType()); @@ -121,7 +121,7 @@ public class TestNiFiRemotePort { Referenceable ref = refs.getInputs().iterator().next(); assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); assertEquals("outputPortA", ref.get(ATTR_NAME)); - assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -138,8 +138,8 @@ public class TestNiFiRemotePort { when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); when(sendEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -148,7 +148,7 @@ public class TestNiFiRemotePort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionTo(matches("s2s-client-component-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); @@ -164,7 +164,7 @@ public class TestNiFiRemotePort { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); assertEquals("inputPortA", ref.get(ATTR_NAME)); - assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("remote-port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -180,8 +180,8 @@ public class TestNiFiRemotePort { when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); when(record.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -190,7 +190,7 @@ public class TestNiFiRemotePort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionFrom(matches("s2s-client-component-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType()); @@ -202,7 +202,7 @@ public class TestNiFiRemotePort { Referenceable ref = refs.getInputs().iterator().next(); assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); assertEquals("outputPortA", ref.get(ATTR_NAME)); - assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("remote-port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java index 84777afb0a..d5a9541e61 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java @@ -22,7 +22,7 @@ import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -59,8 +59,8 @@ public class TestNiFiRootGroupPort { when(receiveEvent.getTransitUri()).thenReturn(transitUri); when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -69,7 +69,7 @@ public class TestNiFiRootGroupPort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType()); @@ -85,7 +85,7 @@ public class TestNiFiRootGroupPort { Referenceable ref = refs.getInputs().iterator().next(); assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); assertEquals("inputPortA", ref.get(ATTR_NAME)); - assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -98,8 +98,8 @@ public class TestNiFiRootGroupPort { when(sendEvent.getTransitUri()).thenReturn(transitUri); when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -108,7 +108,7 @@ public class TestNiFiRootGroupPort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); @@ -120,7 +120,7 @@ public class TestNiFiRootGroupPort { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); assertEquals("outputPortA", ref.get(ATTR_NAME)); - assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -135,8 +135,8 @@ public class TestNiFiRootGroupPort { when(receiveEvent.getTransitUri()).thenReturn(transitUri); when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -145,7 +145,7 @@ public class TestNiFiRootGroupPort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType()); @@ -161,7 +161,7 @@ public class TestNiFiRootGroupPort { Referenceable ref = refs.getInputs().iterator().next(); assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); assertEquals("inputPortA", ref.get(ATTR_NAME)); - assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -175,8 +175,8 @@ public class TestNiFiRootGroupPort { when(sendEvent.getTransitUri()).thenReturn(transitUri); when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final ConnectionStatus connection = new ConnectionStatus(); @@ -185,7 +185,7 @@ public class TestNiFiRootGroupPort { connections.add(connection); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); @@ -197,7 +197,7 @@ public class TestNiFiRootGroupPort { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); assertEquals("outputPortA", ref.get(ATTR_NAME)); - assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java index 0194cddb3c..d84751a2a2 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java @@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.junit.Test; @@ -55,11 +55,11 @@ public class TestPutHiveStreaming { when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_A.table_A"); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); assertNotNull(analyzer); @@ -70,6 +70,6 @@ public class TestPutHiveStreaming { Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); assertEquals("table_a", ref.get(ATTR_NAME)); - assertEquals("database_a.table_a@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_a.table_a@namespace1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java index 96623a5c4f..b984d0a18c 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java @@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; -import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.NamespaceResolvers; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -50,15 +50,15 @@ public class TestUnknownDataSet { when(record.getComponentId()).thenReturn(processorId); when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionTo(processorId)).thenReturn(connections); - when(context.getNiFiClusterName()).thenReturn("nifi-cluster"); + when(context.getNiFiNamespace()).thenReturn("test_namespace"); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, null, record.getEventType()); assertNotNull(analyzer); @@ -69,7 +69,7 @@ public class TestUnknownDataSet { Referenceable ref = refs.getInputs().iterator().next(); assertEquals("nifi_data", ref.getTypeName()); assertEquals("GenerateFlowFile", ref.get(ATTR_NAME)); - assertEquals("processor-1234@nifi-cluster", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("processor-1234@test_namespace", ref.get(ATTR_QUALIFIED_NAME)); } @Test @@ -81,15 +81,15 @@ public class TestUnknownDataSet { when(record.getComponentId()).thenReturn(processorId); when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE); - final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class); + when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1"); final List connections = new ArrayList<>(); // The content of connection is not important, just create an empty status. connections.add(new ConnectionStatus()); final AnalysisContext context = Mockito.mock(AnalysisContext.class); - when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.getNamespaceResolver()).thenReturn(namespaceResolvers); when(context.findConnectionTo(processorId)).thenReturn(connections); final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, null, record.getEventType()); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java index 55d45a26c5..a6e2f890ca 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java @@ -77,7 +77,7 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWOR import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS; import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER; import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY_COMPLETE_PATH; -import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_LINEAGE_STRATEGY; +import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY; import static org.apache.nifi.atlas.reporting.SimpleProvenanceRecord.pr; import static org.apache.nifi.provenance.ProvenanceEventType.ATTRIBUTES_MODIFIED; import static org.apache.nifi.provenance.ProvenanceEventType.CREATE; @@ -778,7 +778,7 @@ public class ITReportLineageToAtlas { @Test public void testS2SSendComplete() throws Exception { final TestConfiguration tc = new TestConfiguration("S2SSend"); - tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); + tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); testS2SSend(tc); @@ -837,7 +837,7 @@ public class ITReportLineageToAtlas { @Test public void testS2SSendCompleteRAW() throws Exception { final TestConfiguration tc = new TestConfiguration("S2SSendRAW"); - tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); + tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); testS2SSendRAW(tc); @@ -1294,7 +1294,7 @@ public class ITReportLineageToAtlas { @Test public void testSimpleEventLevelCompletePath() throws Exception { final TestConfiguration tc = new TestConfiguration("SimpleEventLevel"); - tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); + tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); final ProvenanceRecords prs = tc.provenanceRecords; String flowFileUUIDA = "A0000000-0000-0000"; @@ -1339,7 +1339,7 @@ public class ITReportLineageToAtlas { @Test public void testMergedEvents() throws Exception { final TestConfiguration tc = new TestConfiguration("MergedEvents"); - tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); + tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); final ProvenanceRecords prs = tc.provenanceRecords; final String flowFileUUIDA = "A0000000-0000-0000"; final String flowFileUUIDB = "B0000000-0000-0000"; @@ -1422,7 +1422,7 @@ public class ITReportLineageToAtlas { @Test public void testRecordAndDataSetLevel() throws Exception { final TestConfiguration tc = new TestConfiguration("RecordAndDataSetLevel"); - tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); + tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue()); final ProvenanceRecords prs = tc.provenanceRecords; // Publish part diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java index bb4297fe20..da39acc76f 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java @@ -39,10 +39,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.net.MalformedURLException; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -63,6 +66,7 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_BOOTSTR import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -86,7 +90,7 @@ public class TestReportLineageToAtlas { } @Test - public void validateAtlasUrls() throws Exception { + public void validateAtlasUrlsFromProperty() throws Exception { final MockProcessContext processContext = new MockProcessContext(testSubject); final MockValidationContext validationContext = new MockValidationContext(processContext); @@ -105,10 +109,11 @@ public class TestReportLineageToAtlas { } }; - // Default setting. - assertResults.accept(testSubject.validate(validationContext), - r -> assertTrue("Atlas URLs is required", !r.isValid())); + // Default setting or empty urls + assertTrue(processContext.isValid()); + processContext.removeProperty(ATLAS_URLS); + assertTrue(processContext.isValid()); // Invalid URL. processContext.setProperty(ATLAS_URLS, "invalid"); @@ -133,6 +138,165 @@ public class TestReportLineageToAtlas { r -> assertTrue("Atlas URLs is invalid", !r.isValid())); } + @Test + public void validateNoAtlasUrlsFromConfig() throws Exception { + // GIVEN + Properties atlasConf = new Properties(); + + Consumer assertion = e -> assertEquals( + "No Atlas URL has been specified! Set either the 'Atlas URLs' property on the processor or the 'atlas.rest.address' property in the atlas configuration file.", + e.getMessage() + ); + + // WHEN + // THEN + validateAtlasUrlsFromConfig(atlasConf, assertion); + } + + @Test + public void validateNoProtocolAtlasUrlsFromConfig() throws Exception { + // GIVEN + String atlasUrls = "noProtocolUrl, https://atlasUrl"; + + Properties atlasConf = new Properties(); + atlasConf.setProperty("atlas.rest.address", atlasUrls); + + Consumer assertion = e -> assertTrue( + "Expected " + MalformedURLException.class.getSimpleName() + " for " + atlasUrls + ", got " + e, + e.getCause() instanceof MalformedURLException + ); + + // WHEN + // THEN + validateAtlasUrlsFromConfig(atlasConf, assertion); + } + + private void validateAtlasUrlsFromConfig(Properties atlasConf, Consumer exceptionConsumer) throws Exception { + // GIVEN + Consumer> propertiesAdjustment = properties -> { + properties.put(ATLAS_CONF_CREATE, "false"); + properties.remove(ATLAS_URLS); + }; + + // WHEN + // THEN + testSetup( + atlasConf, + propertiesAdjustment, + () -> fail(), + e -> { + assertTrue("Expected a " + ProcessException.class.getSimpleName() + ", got " + e, e instanceof ProcessException); + exceptionConsumer.accept(e); + } + ); + } + + @Test + public void testCreateAtlasPropertiesWithAtlasURLs() throws Exception { + // GIVEN + String atlasUrls = "http://atlasUrl1,http://atlasUrl2"; + + Properties atlasConf = new Properties(); + + Consumer> propertiesAdjustment = properties -> { + properties.put(ATLAS_CONF_CREATE, "true"); + properties.put(ATLAS_URLS, atlasUrls); + }; + + Runnable assertion = () -> { + Properties atlasProperties = new Properties(); + final File atlasPropertiesFile = new File("target/atlasConfDir", "atlas-application.properties"); + try (InputStream in = new FileInputStream(atlasPropertiesFile)) { + atlasProperties.load(in); + } catch (Exception e) { + throw new AssertionError(e); + } + + assertEquals(atlasUrls, atlasProperties.getProperty("atlas.rest.address")); + }; + + + // WHEN + // THEN + testSetup( + atlasConf, + propertiesAdjustment, + assertion, + e -> { + throw new AssertionError(e); + } + ); + } + + @Test + public void testCreateAtlasPropertiesWithMetadataNamespace() throws Exception { + // GIVEN + String atlasMetadataNamespace = "namespace"; + + Properties atlasConf = new Properties(); + + Consumer> propertiesAdjustment = properties -> { + properties.put(ATLAS_CONF_CREATE, "true"); + properties.put(ATLAS_DEFAULT_CLUSTER_NAME, atlasMetadataNamespace); + }; + + Runnable assertion = () -> { + Properties atlasProperties = new Properties(); + final File atlasPropertiesFile = new File("target/atlasConfDir", "atlas-application.properties"); + try (InputStream in = new FileInputStream(atlasPropertiesFile)) { + atlasProperties.load(in); + } catch (Exception e) { + throw new AssertionError(e); + } + + assertEquals(atlasMetadataNamespace, atlasProperties.getProperty("atlas.metadata.namespace")); + }; + + + // WHEN + // THEN + testSetup( + atlasConf, + propertiesAdjustment, + assertion, + e -> { + throw new AssertionError(e); + } + ); + } + + private void testSetup( + Properties atlasConf, + Consumer> propertiesAdjustment, + Runnable onSuccess, Consumer exceptionConsumer + ) throws Exception { + // GIVEN + String atlasConfDir = createAtlasConfDir(); + + Map properties = initReportingTaskProperties(atlasConfDir); + propertiesAdjustment.accept(properties); + + saveAtlasConf(atlasConfDir, atlasConf); + + reportingContext = mock(ReportingContext.class); + when(reportingContext.getProperties()).thenReturn(properties); + when(reportingContext.getProperty(any())).then(invocation -> new MockPropertyValue(properties.get(invocation.getArguments()[0]))); + + ConfigurationContext configurationContext = new MockConfigurationContext(properties, null); + + testSubject.initialize(initializationContext); + + // WHEN + try { + testSubject.setup(configurationContext); + onSuccess.run(); + + // THEN + } catch (Exception e) { + exceptionConsumer.accept(e); + } + } + @Test public void testDefaultConnectAndReadTimeout() throws Exception { // GIVEN diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexNamespaceResolver.java similarity index 67% rename from nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java rename to nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexNamespaceResolver.java index bccd8c0458..5bc9492f69 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexClusterResolver.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/resolver/TestRegexNamespaceResolver.java @@ -30,7 +30,7 @@ import java.util.Map; import static org.mockito.Mockito.when; -public class TestRegexClusterResolver { +public class TestRegexNamespaceResolver { private PropertyContext context; private ValidationContext validationContext; @@ -45,7 +45,7 @@ public class TestRegexClusterResolver { @Test public void testEmptySettings() { setupMock(Collections.EMPTY_MAP); - final RegexClusterResolver resolver = new RegexClusterResolver(); + final RegexNamespaceResolver resolver = new RegexNamespaceResolver(); // It should be valid final Collection validationResults = resolver.validate(validationContext); @@ -56,16 +56,16 @@ public class TestRegexClusterResolver { } @Test - public void testInvalidClusterName() { + public void testInvalidNamespace() { final Map properties = new HashMap<>(); - properties.put(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, ".*\\.example.com"); + properties.put(RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX, ".*\\.example.com"); setupMock(properties); - final RegexClusterResolver resolver = new RegexClusterResolver(); + final RegexNamespaceResolver resolver = new RegexNamespaceResolver(); final Collection validationResults = resolver.validate(validationContext); Assert.assertEquals(1, validationResults.size()); final ValidationResult validationResult = validationResults.iterator().next(); - Assert.assertEquals(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, validationResult.getSubject()); + Assert.assertEquals(RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX, validationResult.getSubject()); try { resolver.configure(context); @@ -77,10 +77,10 @@ public class TestRegexClusterResolver { @Test public void testEmptyPattern() { final Map properties = new HashMap<>(); - final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1"; + final String propertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + "Namespace1"; properties.put(propertyName, ""); setupMock(properties); - final RegexClusterResolver resolver = new RegexClusterResolver(); + final RegexNamespaceResolver resolver = new RegexNamespaceResolver(); final Collection validationResults = resolver.validate(validationContext); Assert.assertEquals(1, validationResults.size()); @@ -97,61 +97,64 @@ public class TestRegexClusterResolver { @Test public void testSinglePattern() { final Map properties = new HashMap<>(); - final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1"; + final String propertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + "Namespace1"; properties.put(propertyName, "^.*\\.example.com$"); setupMock(properties); - final RegexClusterResolver resolver = new RegexClusterResolver(); + final RegexNamespaceResolver resolver = new RegexNamespaceResolver(); final Collection validationResults = resolver.validate(validationContext); Assert.assertEquals(0, validationResults.size()); resolver.configure(context); - Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com")); + Assert.assertEquals("Namespace1", resolver.fromHostNames("host1.example.com")); } @Test public void testMultiplePatterns() { final Map properties = new HashMap<>(); - final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1"; + final String propertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + "Namespace1"; // Hostname or local ip address, delimited with a whitespace properties.put(propertyName, "^.*\\.example.com$\n^192.168.1.[\\d]+$"); setupMock(properties); - final RegexClusterResolver resolver = new RegexClusterResolver(); + final RegexNamespaceResolver resolver = new RegexNamespaceResolver(); final Collection validationResults = resolver.validate(validationContext); Assert.assertEquals(0, validationResults.size()); resolver.configure(context); - Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com")); - Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10")); - Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22")); + Assert.assertEquals("Namespace1", resolver.fromHostNames("host1.example.com")); + Assert.assertEquals("Namespace1", resolver.fromHostNames("192.168.1.10")); + Assert.assertEquals("Namespace1", resolver.fromHostNames("192.168.1.22")); Assert.assertNull(resolver.fromHostNames("192.168.2.30")); } @Test - public void testMultipleClusters() { + public void testMultipleNamespaces() { + String namespace1 = "Namepsace1"; + String namespace2 = "Namespace2"; + final Map properties = new HashMap<>(); - final String c1PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1"; - final String c2PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster2"; + final String namespace1PropertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + namespace1; + final String namepsace2PropertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + namespace2; // Hostname or local ip address - properties.put(c1PropertyName, "^.*\\.c1\\.example.com$ ^192.168.1.[\\d]+$"); - properties.put(c2PropertyName, "^.*\\.c2\\.example.com$ ^192.168.2.[\\d]+$"); + properties.put(namespace1PropertyName, "^.*\\.c1\\.example.com$ ^192.168.1.[\\d]+$"); + properties.put(namepsace2PropertyName, "^.*\\.c2\\.example.com$ ^192.168.2.[\\d]+$"); setupMock(properties); - final RegexClusterResolver resolver = new RegexClusterResolver(); + final RegexNamespaceResolver resolver = new RegexNamespaceResolver(); final Collection validationResults = resolver.validate(validationContext); Assert.assertEquals(0, validationResults.size()); resolver.configure(context); - Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.c1.example.com")); - Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10")); - Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22")); - Assert.assertEquals("Cluster2", resolver.fromHostNames("host2.c2.example.com")); - Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.10")); - Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.22")); + Assert.assertEquals(namespace1, resolver.fromHostNames("host1.c1.example.com")); + Assert.assertEquals(namespace1, resolver.fromHostNames("192.168.1.10")); + Assert.assertEquals(namespace1, resolver.fromHostNames("192.168.1.22")); + Assert.assertEquals(namespace2, resolver.fromHostNames("host2.c2.example.com")); + Assert.assertEquals(namespace2, resolver.fromHostNames("192.168.2.10")); + Assert.assertEquals(namespace2, resolver.fromHostNames("192.168.2.22")); Assert.assertNull(resolver.fromHostNames("192.168.3.30")); }