From 2b461bbf295b0065ec1adacac4d3d14190e75f16 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Fri, 15 May 2020 15:01:22 +0200 Subject: [PATCH] NIFI-7452: Support adls_gen2_directory in Atlas reporting task Signed-off-by: Pierre Villard This closes #4636. --- .../analyzer/AzureADLSDirectory.java | 101 ++++++++++++ ...las.provenance.NiFiProvenanceEventAnalyzer | 1 + .../analyzer/TestAzureADLSDirectory.java | 151 ++++++++++++++++++ nifi-nar-bundles/nifi-atlas-bundle/pom.xml | 2 +- 4 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java create mode 100644 nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java new file mode 100644 index 0000000000..2ced136d1b --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.utils.AtlasPathExtractorUtil; +import org.apache.atlas.utils.PathExtractorContext; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +import java.util.Map; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; + +/** + * Analyze a transit URI as an Azure ADLS Gen2 directory (skipping the file name). + *
  • qualifiedName=abfs://filesystem@account/path@namespace (example: abfs://myfilesystem@myaccount/mydir1/mydir2@ns1) + *
  • name=directory (example: mydir2) + */ +public class AzureADLSDirectory extends AbstractNiFiProvenanceEventAnalyzer { + + public static final String TYPE_DIRECTORY = "adls_gen2_directory"; + public static final String TYPE_CONTAINER = "adls_gen2_container"; + public static final String TYPE_ACCOUNT = "adls_gen2_account"; + + public static final String ATTR_PARENT = "parent"; + public static final String ATTR_ACCOUNT = "account"; + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + String transitUri = event.getTransitUri(); + if (transitUri == null) { + return null; + } + + Path path = new Path(transitUri); + + String namespace = context.getNamespaceResolver().fromHostNames(path.toUri().getHost()); + + PathExtractorContext pathExtractorContext = new PathExtractorContext(namespace); + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext); + + // the last component of the URI is returned as a directory object but in fact it refers the filename + Referenceable fileRef = convertToReferenceable(entityWithExtInfo.getEntity(), pathExtractorContext.getKnownEntities()); + Referenceable parentRef = (Referenceable) fileRef.get(ATTR_PARENT); + + return parentRef != null ? singleDataSetRef(event.getComponentId(), event.getEventType(), parentRef) : null; + } + + @Override + public String targetTransitUriPattern() { + return "^abfs(s)?://.+@.+/.+$"; + } + + private Referenceable convertToReferenceable(AtlasEntity entity, Map knownEntities) { + if (entity == null) { + return null; + } + + Referenceable ref = new Referenceable(entity.getTypeName()); + + ref.set(ATTR_QUALIFIED_NAME, entity.getAttribute(ATTR_QUALIFIED_NAME)); + ref.set(ATTR_NAME, entity.getAttribute(ATTR_NAME)); + + if (TYPE_DIRECTORY.equals(entity.getTypeName())) { + AtlasObjectId parentObjectId = (AtlasObjectId) entity.getRelationshipAttribute(ATTR_PARENT); + if (parentObjectId != null) { + AtlasEntity parentEntity = knownEntities.get(parentObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME)); + ref.set(ATTR_PARENT, convertToReferenceable(parentEntity, knownEntities)); + } + } else if (TYPE_CONTAINER.equals(entity.getTypeName())) { + AtlasObjectId accountObjectId = (AtlasObjectId) entity.getRelationshipAttribute(ATTR_ACCOUNT); + if (accountObjectId != null) { + AtlasEntity accountEntity = knownEntities.get(accountObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME)); + ref.set(ATTR_ACCOUNT, convertToReferenceable(accountEntity, knownEntities)); + } + } + + return ref; + } +} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer index a2e607110a..ede3abb3e7 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer @@ -25,6 +25,7 @@ org.apache.nifi.atlas.provenance.analyzer.HDFSPath org.apache.nifi.atlas.provenance.analyzer.HBaseTable org.apache.nifi.atlas.provenance.analyzer.FilePath org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory +org.apache.nifi.atlas.provenance.analyzer.AzureADLSDirectory # By event type, if none of above analyzers matches org.apache.nifi.atlas.provenance.analyzer.unknown.Create diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java new file mode 100644 index 0000000000..2f7cdd5b43 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.atlas.resolver.NamespaceResolver; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.provenance.analyzer.AzureADLSDirectory.ATTR_ACCOUNT; +import static org.apache.nifi.atlas.provenance.analyzer.AzureADLSDirectory.ATTR_PARENT; +import static org.apache.nifi.atlas.provenance.analyzer.AzureADLSDirectory.TYPE_ACCOUNT; +import static org.apache.nifi.atlas.provenance.analyzer.AzureADLSDirectory.TYPE_CONTAINER; +import static org.apache.nifi.atlas.provenance.analyzer.AzureADLSDirectory.TYPE_DIRECTORY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +public class TestAzureADLSDirectory { + + private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND; + private static final String ATLAS_NAMESPACE = "namespace1"; + private static final String ADLS_ACCOUNT = "account1"; + private static final String ADLS_FILESYSTEM = "filesystem1"; + private static final String ADLS_FILENAME = "file1"; + + @Test + public void testSimpleDirectory() { + String processorName = "PutHDFS"; + String path = "/dir1"; + + executeTest(processorName, path); + } + + @Test + public void testCompoundDirectory() { + String processorName = "PutHDFS"; + String path = "/dir1/dir2/dir3/dir4/dir5"; + + executeTest(processorName, path); + } + + @Test + public void testRootDirectory() { + String processorName = "PutHDFS"; + String path = ""; + + executeTest(processorName, path); + } + + @Test + public void testWithPutORC() { + String processorName = "PutORC"; + String path = "/dir1"; + + executeTest(processorName, path); + } + + public void executeTest(String processorName, String path) { + String transitUri = String.format("abfs://%s@%s.dfs.core.windows.net%s/%s", ADLS_FILESYSTEM, ADLS_ACCOUNT, path, ADLS_FILENAME); + + ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri); + AnalysisContext analysisContext = mockAnalysisContext(); + + NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, PROVENANCE_EVENT_TYPE); + assertAnalyzer(analyzer); + + DataSetRefs refs = analyzer.analyze(analysisContext, provenanceEvent); + assertAnalysisResult(refs, path); + } + + private ProvenanceEventRecord mockProvenanceEvent(String processorName, String transitUri) { + ProvenanceEventRecord provenanceEvent = Mockito.mock(ProvenanceEventRecord.class); + + when(provenanceEvent.getComponentType()).thenReturn(processorName); + when(provenanceEvent.getTransitUri()).thenReturn(transitUri); + when(provenanceEvent.getEventType()).thenReturn(PROVENANCE_EVENT_TYPE); + + return provenanceEvent; + } + + private AnalysisContext mockAnalysisContext() { + NamespaceResolver namespaceResolver = Mockito.mock(NamespaceResolver.class); + when(namespaceResolver.fromHostNames(any())).thenReturn(ATLAS_NAMESPACE); + + AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class); + when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver); + + return analysisContext; + } + + private void assertAnalyzer(NiFiProvenanceEventAnalyzer analyzer) { + assertNotNull(analyzer); + assertEquals(AzureADLSDirectory.class, analyzer.getClass()); + } + + private void assertAnalysisResult(DataSetRefs refs, String path) { + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + + Referenceable ref = refs.getOutputs().iterator().next(); + + String actualPath = path; + while (StringUtils.isNotEmpty(actualPath)) { + String directory = StringUtils.substringAfterLast(actualPath, "/"); + + assertEquals(TYPE_DIRECTORY, ref.getTypeName()); + assertEquals(String.format("abfs://%s@%s%s/@%s", ADLS_FILESYSTEM, ADLS_ACCOUNT, actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME)); + assertEquals(directory, ref.get(ATTR_NAME)); + assertNotNull(ref.get(ATTR_PARENT)); + + ref = (Referenceable) ref.get(ATTR_PARENT); + actualPath = StringUtils.substringBeforeLast(actualPath, "/"); + } + + assertEquals(TYPE_CONTAINER, ref.getTypeName()); + assertEquals(String.format("abfs://%s@%s@%s", ADLS_FILESYSTEM, ADLS_ACCOUNT, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME)); + assertEquals(ADLS_FILESYSTEM, ref.get(ATTR_NAME)); + assertNotNull(ref.get(ATTR_ACCOUNT)); + + ref = (Referenceable) ref.get(ATTR_ACCOUNT); + + assertEquals(TYPE_ACCOUNT, ref.getTypeName()); + assertEquals(String.format("abfs://%s@%s", ADLS_ACCOUNT, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME)); + assertEquals(ADLS_ACCOUNT, ref.get(ATTR_NAME)); + } +} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml index 2c95c474b5..973f7c21e5 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml @@ -26,7 +26,7 @@ pom - 2.0.0 + 2.1.0