diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java index 4562139127..f592710a5b 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java @@ -31,4 +31,5 @@ public interface AnalysisContext { ComputeLineageResult queryLineage(long eventId); ComputeLineageResult findParents(long eventId); ProvenanceEventRecord getProvenanceEvent(long eventId); + String getAwsS3ModelVersion(); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java index 3ca9ff1d91..f889650125 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java @@ -36,12 +36,14 @@ public class StandardAnalysisContext implements AnalysisContext { private final NiFiFlow nifiFlow; private final NamespaceResolver namespaceResolver; private final ProvenanceRepository provenanceRepository; + private final String awsS3ModelVersion; public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver namespaceResolver, - ProvenanceRepository provenanceRepository) { + ProvenanceRepository provenanceRepository, String awsS3ModelVersion) { this.nifiFlow = nifiFlow; this.namespaceResolver = namespaceResolver; this.provenanceRepository = provenanceRepository; + this.awsS3ModelVersion = awsS3ModelVersion; } @Override @@ -101,4 +103,8 @@ public class StandardAnalysisContext implements AnalysisContext { } } + @Override + public String getAwsS3ModelVersion() { + return awsS3ModelVersion; + } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java index af683fdc7c..4d58504666 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java @@ -16,54 +16,80 @@ */ package org.apache.nifi.atlas.provenance.analyzer; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.utils.AtlasPathExtractorUtil; +import org.apache.atlas.utils.PathExtractorContext; import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.provenance.ProvenanceEventRecord; -import java.net.URI; +import java.util.Map; -import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; /** * Analyze a transit URI as an AWS S3 directory (skipping the object name). - *
  • qualifiedName=s3a://bucket/path@namespace (example: s3a://mybucket/mydir@ns1) - *
  • name=/path (example: /mydir) + * The analyzer outputs a v1 or v2 AWS S3 directory entity depending on the 'AWS S3 Model Version' property configured on the reporting task. + *

    + * Atlas entity hierarchy v1: aws_s3_pseudo_dir -> aws_s3_bucket + *

    aws_s3_pseudo_dir + *

    + *

    aws_s3_bucket + *

    + *

    + * Atlas entity hierarchy v2: aws_s3_v2_directory -> aws_s3_v2_directory -> ... -> aws_s3_v2_bucket + *

    aws_s3_v2_directory + *

    + *

    aws_s3_v2_bucket + *

    */ public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer { - private static final String TYPE_DIRECTORY = "aws_s3_pseudo_dir"; - private static final String TYPE_BUCKET = "aws_s3_bucket"; + public static final String TYPE_DIRECTORY_V1 = AtlasPathExtractorUtil.AWS_S3_PSEUDO_DIR; + public static final String TYPE_BUCKET_V1 = AtlasPathExtractorUtil.AWS_S3_BUCKET; + public static final String ATTR_BUCKET_V1 = AtlasPathExtractorUtil.ATTRIBUTE_BUCKET; + public static final String ATTR_OBJECT_PREFIX_V1 = AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX; - public static final String ATTR_OBJECT_PREFIX = "objectPrefix"; - public static final String ATTR_BUCKET = "bucket"; + public static final String TYPE_DIRECTORY_V2 = AtlasPathExtractorUtil.AWS_S3_V2_PSEUDO_DIR; + public static final String TYPE_BUCKET_V2 = AtlasPathExtractorUtil.AWS_S3_V2_BUCKET; + public static final String ATTR_CONTAINER_V2 = AtlasPathExtractorUtil.ATTRIBUTE_CONTAINER; + public static final String ATTR_OBJECT_PREFIX_V2 = AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX; @Override public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { - final String transitUri = event.getTransitUri(); + String transitUri = event.getTransitUri(); if (transitUri == null) { return null; } - final String directoryUri; - if (StringUtils.countMatches(transitUri, '/') > 3) { - // directory exists => drop last '/' and the file name - directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/')); - } else { - // no directory => keep last '/', drop only the file name - directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1); - } - final URI uri = parseUri(directoryUri); + String directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1); - final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost()); + Path path = new Path(directoryUri); - final Referenceable ref = createDirectoryRef(uri, namespace); + String namespace = context.getNamespaceResolver().fromHostNames(path.toUri().getHost()); - return singleDataSetRef(event.getComponentId(), event.getEventType(), ref); + PathExtractorContext pathExtractorContext = new PathExtractorContext(namespace, context.getAwsS3ModelVersion()); + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext); + + Referenceable ref = convertToReferenceable(entityWithExtInfo.getEntity(), pathExtractorContext.getKnownEntities()); + + return ref != null ? singleDataSetRef(event.getComponentId(), event.getEventType(), ref) : null; } @Override @@ -71,23 +97,34 @@ public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer { return "^s3a://.+/.+$"; } - private Referenceable createDirectoryRef(URI uri, String namespace) { - final Referenceable ref = new Referenceable(TYPE_DIRECTORY); + private Referenceable convertToReferenceable(AtlasEntity entity, Map knownEntities) { + if (entity == null) { + return null; + } - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, uri.toString().toLowerCase())); - ref.set(ATTR_NAME, uri.getPath().toLowerCase()); - ref.set(ATTR_OBJECT_PREFIX, uri.getPath().toLowerCase()); - ref.set(ATTR_BUCKET, createBucketRef(uri, namespace)); + Referenceable ref = createReferenceable(entity); + + if (TYPE_DIRECTORY_V1.equals(entity.getTypeName())) { + AtlasObjectId bucketObjectId = (AtlasObjectId) entity.getRelationshipAttribute(ATTR_BUCKET_V1); + if (bucketObjectId != null) { + AtlasEntity bucketEntity = knownEntities.get(bucketObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME)); + ref.set(ATTR_BUCKET_V1, convertToReferenceable(bucketEntity, knownEntities)); + } + } else if (TYPE_DIRECTORY_V2.equals(entity.getTypeName())) { + AtlasObjectId containerObjectId = (AtlasObjectId) entity.getRelationshipAttribute(ATTR_CONTAINER_V2); + if (containerObjectId != null) { + AtlasEntity containerEntity = knownEntities.get(containerObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME)); + ref.set(ATTR_CONTAINER_V2, convertToReferenceable(containerEntity, knownEntities)); + } + } return ref; } - private Referenceable createBucketRef(URI uri, String namespace) { - final Referenceable ref = new Referenceable(TYPE_BUCKET); - - ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, String.format("%s://%s", uri.getScheme(), uri.getAuthority()))); - ref.set(ATTR_NAME, uri.getAuthority()); - + private Referenceable createReferenceable(AtlasEntity entity) { + Referenceable ref = new Referenceable(entity.getTypeName()); + ref.setValues(entity.getAttributes()); return ref; } + } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java index 2ced136d1b..b4bfadb71e 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java @@ -34,17 +34,32 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; /** * Analyze a transit URI as an Azure ADLS Gen2 directory (skipping the file name). - *
  • qualifiedName=abfs://filesystem@account/path@namespace (example: abfs://myfilesystem@myaccount/mydir1/mydir2@ns1) - *
  • name=directory (example: mydir2) + *

    + * Atlas entity hierarchy: adls_gen2_directory -> adls_gen2_directory -> ... -> adls_gen2_container -> adls_gen2_account + *

    adls_gen2_directory + *

    + *

    adls_gen2_container + *

    + *

    adls_gen2_account + *

    */ public class AzureADLSDirectory extends AbstractNiFiProvenanceEventAnalyzer { - public static final String TYPE_DIRECTORY = "adls_gen2_directory"; - public static final String TYPE_CONTAINER = "adls_gen2_container"; - public static final String TYPE_ACCOUNT = "adls_gen2_account"; + public static final String TYPE_DIRECTORY = AtlasPathExtractorUtil.ADLS_GEN2_DIRECTORY; + public static final String TYPE_CONTAINER = AtlasPathExtractorUtil.ADLS_GEN2_CONTAINER; + public static final String TYPE_ACCOUNT = AtlasPathExtractorUtil.ADLS_GEN2_ACCOUNT; - public static final String ATTR_PARENT = "parent"; - public static final String ATTR_ACCOUNT = "account"; + public static final String ATTR_PARENT = AtlasPathExtractorUtil.ATTRIBUTE_PARENT; + public static final String ATTR_ACCOUNT = AtlasPathExtractorUtil.ATTRIBUTE_ACCOUNT; @Override public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index f6ccdd7836..9a5982bb28 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.security.SecurityProperties; +import org.apache.atlas.utils.AtlasPathExtractorUtil; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.alias.CredentialProvider; @@ -325,6 +326,22 @@ public class ReportLineageToAtlas extends AbstractReportingTask { .defaultValue(LINEAGE_STRATEGY_SIMPLE_PATH.getValue()) .build(); + static final AllowableValue AWS_S3_MODEL_VERSION_V1 = new AllowableValue("v1", "v1", + "Creates AWS S3 directory entities version 1 (aws_s3_pseudo_dir)."); + static final AllowableValue AWS_S3_MODEL_VERSION_V2 = new AllowableValue(AtlasPathExtractorUtil.AWS_S3_ATLAS_MODEL_VERSION_V2, "v2", + "Creates AWS S3 directory entities version 2 (aws_s3_v2_directory)."); + + static final PropertyDescriptor AWS_S3_MODEL_VERSION = new PropertyDescriptor.Builder() + .name("aws-s3-model-version") + .displayName("AWS S3 Model Version") + .description("Specifies what type of AWS S3 directory entities will be created in Atlas for s3a:// transit URIs (eg. PutHDFS with S3 integration)." + + " NOTE: It is strongly recommended to keep using the same AWS S3 entity model version once this reporting task started to keep Atlas data clean." + + " Switching versions will not delete existing Atlas entities created by the old version, nor migrate them to the new version.") + .required(true) + .allowableValues(AWS_S3_MODEL_VERSION_V1, AWS_S3_MODEL_VERSION_V2) + .defaultValue(AWS_S3_MODEL_VERSION_V2.getValue()) + .build(); + private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties"; private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs"; private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs"; @@ -387,6 +404,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask { properties.add(ATLAS_CONNECT_TIMEOUT); properties.add(ATLAS_READ_TIMEOUT); + // Provenance event analyzer specific properties + properties.add(AWS_S3_MODEL_VERSION); + return properties; } @@ -848,9 +868,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask { private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) { final EventAccess eventAccess = context.getEventAccess(); + final String awsS3ModelVersion = context.getProperty(AWS_S3_MODEL_VERSION).getValue(); final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, namespaceResolvers, // FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update. - (ProvenanceRepository)eventAccess.getProvenanceRepository()); + (ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion); consumer.consumeEvents(context, (componentMapHolder, events) -> { for (ProvenanceEventRecord event : events) { try { diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/AbstractTestAwsS3Directory.java similarity index 58% rename from nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java rename to nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/AbstractTestAwsS3Directory.java index d9343280f6..5e0db8326a 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/AbstractTestAwsS3Directory.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.atlas.provenance.analyzer; -import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; @@ -24,56 +23,25 @@ import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; import org.apache.nifi.atlas.resolver.NamespaceResolver; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; -import org.junit.Test; import org.mockito.Mockito; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -public class TestAwsS3Directory { +public abstract class AbstractTestAwsS3Directory { - private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND; - private static final String ATLAS_NAMESPACE = "namespace1"; - private static final String AWS_BUCKET = "bucket1"; - private static final String AWS_FILENAME = "file1"; + protected static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND; + protected static final String ATLAS_NAMESPACE = "namespace1"; + protected static final String AWS_BUCKET = "bucket1"; + protected static final String AWS_FILENAME = "file1"; - @Test - public void testSimpleDirectory() { - String processorName = "PutHDFS"; - String directory = "/dir1"; + protected abstract String getAwsS3ModelVersion(); - executeTest(processorName, directory); - } + protected abstract void assertAnalysisResult(DataSetRefs refs, String directory); - @Test - public void testCompoundDirectory() { - String processorName = "PutHDFS"; - String directory = "/dir1/dir2/dir3/dir4/dir5"; - - executeTest(processorName, directory); - } - - @Test - public void testRootDirectory() { - String processorName = "PutHDFS"; - String directory = "/"; - - executeTest(processorName, directory); - } - - @Test - public void testWithPutORC() { - String processorName = "PutORC"; - String directory = "/dir1"; - - executeTest(processorName, directory); - } - - public void executeTest(String processorName, String directory) { + protected void executeTest(String processorName, String directory) { String transitUri = createTransitUri(directory); ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri); @@ -110,6 +78,7 @@ public class TestAwsS3Directory { AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class); when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver); + when(analysisContext.getAwsS3ModelVersion()).thenReturn(getAwsS3ModelVersion()); return analysisContext; } @@ -118,25 +87,4 @@ public class TestAwsS3Directory { assertNotNull(analyzer); assertEquals(AwsS3Directory.class, analyzer.getClass()); } - - private void assertAnalysisResult(DataSetRefs refs, String directory) { - String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", AWS_BUCKET, directory, ATLAS_NAMESPACE); - String expectedBucketQualifiedName = String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE); - - assertEquals(0, refs.getInputs().size()); - assertEquals(1, refs.getOutputs().size()); - - Referenceable directoryRef = refs.getOutputs().iterator().next(); - - assertEquals("aws_s3_pseudo_dir", directoryRef.getTypeName()); - assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME)); - assertEquals(directory, directoryRef.get(ATTR_NAME)); - assertEquals(directory, directoryRef.get("objectPrefix")); - - Referenceable bucketRef = (Referenceable) directoryRef.get("bucket"); - assertNotNull(bucketRef); - assertEquals("aws_s3_bucket", bucketRef.getTypeName()); - assertEquals(expectedBucketQualifiedName, bucketRef.get(ATTR_QUALIFIED_NAME)); - assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME)); - } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV1.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV1.java new file mode 100644 index 0000000000..69f5d886c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV1.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.junit.Test; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_BUCKET_V1; +import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_OBJECT_PREFIX_V1; +import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_BUCKET_V1; +import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_DIRECTORY_V1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestAwsS3DirectoryV1 extends AbstractTestAwsS3Directory { + + @Override + protected String getAwsS3ModelVersion() { + return "v1"; + } + + @Test + public void testSimpleDirectory() { + String processorName = "PutHDFS"; + String dirPath = "/dir1"; + + executeTest(processorName, dirPath); + } + + @Test + public void testCompoundDirectory() { + String processorName = "PutHDFS"; + String dirPath = "/dir1/dir2/dir3/dir4/dir5"; + + executeTest(processorName, dirPath); + } + + @Test + public void testRootDirectory() { + String processorName = "PutHDFS"; + String dirPath = "/"; + + executeTest(processorName, dirPath); + } + + @Test + public void testWithPutORC() { + String processorName = "PutORC"; + String dirPath = "/dir1"; + + executeTest(processorName, dirPath); + } + + @Override + protected void assertAnalysisResult(DataSetRefs refs, String dirPath) { + String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", AWS_BUCKET, dirPath, ATLAS_NAMESPACE); + String expectedBucketQualifiedName = String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE); + + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + + Referenceable directoryRef = refs.getOutputs().iterator().next(); + + assertEquals(TYPE_DIRECTORY_V1, directoryRef.getTypeName()); + assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME)); + assertEquals(dirPath, directoryRef.get(ATTR_NAME)); + assertEquals(dirPath, directoryRef.get(ATTR_OBJECT_PREFIX_V1)); + + Referenceable bucketRef = (Referenceable) directoryRef.get(ATTR_BUCKET_V1); + assertNotNull(bucketRef); + assertEquals(TYPE_BUCKET_V1, bucketRef.getTypeName()); + assertEquals(expectedBucketQualifiedName, bucketRef.get(ATTR_QUALIFIED_NAME)); + assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME)); + } +} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java new file mode 100644 index 0000000000..0694b6afac --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.utils.AtlasPathExtractorUtil; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.junit.Test; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_CONTAINER_V2; +import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_OBJECT_PREFIX_V2; +import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_BUCKET_V2; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestAwsS3DirectoryV2 extends AbstractTestAwsS3Directory { + + @Override + protected String getAwsS3ModelVersion() { + return AtlasPathExtractorUtil.AWS_S3_ATLAS_MODEL_VERSION_V2; + } + + @Test + public void testSimpleDirectory() { + String processorName = "PutHDFS"; + String dirPath = "/dir1"; + + executeTest(processorName, dirPath); + } + + @Test + public void testCompoundDirectory() { + String processorName = "PutHDFS"; + String dirPath = "/dir1/dir2/dir3/dir4/dir5"; + + executeTest(processorName, dirPath); + } + + @Test + public void testRootDirectory() { + String processorName = "PutHDFS"; + String dirPath = "/"; + + executeTest(processorName, dirPath); + } + + @Test + public void testWithPutORC() { + String processorName = "PutORC"; + String dirPath = "/dir1"; + + executeTest(processorName, dirPath); + } + + protected void assertAnalysisResult(DataSetRefs refs, String dirPath) { + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + + Referenceable ref = refs.getOutputs().iterator().next(); + + String actualPath = dirPath; + while (StringUtils.isNotEmpty(actualPath) && !"/".equals(actualPath)) { + String directory = StringUtils.substringAfterLast(actualPath, "/"); + + assertEquals(AwsS3Directory.TYPE_DIRECTORY_V2, ref.getTypeName()); + assertEquals(String.format("s3a://%s%s/@%s", AWS_BUCKET, actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME)); + assertEquals(directory, ref.get(ATTR_NAME)); + assertEquals(actualPath + "/", ref.get(ATTR_OBJECT_PREFIX_V2)); + assertNotNull(ref.get(ATTR_CONTAINER_V2)); + + ref = (Referenceable) ref.get(ATTR_CONTAINER_V2); + actualPath = StringUtils.substringBeforeLast(actualPath, "/"); + } + + assertEquals(TYPE_BUCKET_V2, ref.getTypeName()); + assertEquals(String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME)); + assertEquals(AWS_BUCKET, ref.get(ATTR_NAME)); + } +} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java index 2f7cdd5b43..583ad4e08f 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java @@ -80,7 +80,7 @@ public class TestAzureADLSDirectory { executeTest(processorName, path); } - public void executeTest(String processorName, String path) { + private void executeTest(String processorName, String path) { String transitUri = String.format("abfs://%s@%s.dfs.core.windows.net%s/%s", ADLS_FILESYSTEM, ADLS_ACCOUNT, path, ADLS_FILENAME); ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri);