mirror of https://github.com/apache/nifi.git
NIFI-8131: Support aws_s3_v2_directory in Atlas reporting task
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4751.
This commit is contained in:
parent
d41f2e1d0a
commit
84b561ad33
|
@ -31,4 +31,5 @@ public interface AnalysisContext {
|
|||
ComputeLineageResult queryLineage(long eventId);
|
||||
ComputeLineageResult findParents(long eventId);
|
||||
ProvenanceEventRecord getProvenanceEvent(long eventId);
|
||||
String getAwsS3ModelVersion();
|
||||
}
|
||||
|
|
|
@ -36,12 +36,14 @@ public class StandardAnalysisContext implements AnalysisContext {
|
|||
private final NiFiFlow nifiFlow;
|
||||
private final NamespaceResolver namespaceResolver;
|
||||
private final ProvenanceRepository provenanceRepository;
|
||||
private final String awsS3ModelVersion;
|
||||
|
||||
public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver namespaceResolver,
|
||||
ProvenanceRepository provenanceRepository) {
|
||||
ProvenanceRepository provenanceRepository, String awsS3ModelVersion) {
|
||||
this.nifiFlow = nifiFlow;
|
||||
this.namespaceResolver = namespaceResolver;
|
||||
this.provenanceRepository = provenanceRepository;
|
||||
this.awsS3ModelVersion = awsS3ModelVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,4 +103,8 @@ public class StandardAnalysisContext implements AnalysisContext {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAwsS3ModelVersion() {
|
||||
return awsS3ModelVersion;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,54 +16,80 @@
|
|||
*/
|
||||
package org.apache.nifi.atlas.provenance.analyzer;
|
||||
|
||||
import org.apache.atlas.model.instance.AtlasEntity;
|
||||
import org.apache.atlas.model.instance.AtlasObjectId;
|
||||
import org.apache.atlas.utils.AtlasPathExtractorUtil;
|
||||
import org.apache.atlas.utils.PathExtractorContext;
|
||||
import org.apache.atlas.v1.model.instance.Referenceable;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
|
||||
import org.apache.nifi.atlas.provenance.AnalysisContext;
|
||||
import org.apache.nifi.atlas.provenance.DataSetRefs;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
|
||||
|
||||
/**
|
||||
* Analyze a transit URI as an AWS S3 directory (skipping the object name).
|
||||
* <li>qualifiedName=s3a://bucket/path@namespace (example: s3a://mybucket/mydir@ns1)
|
||||
* <li>name=/path (example: /mydir)
|
||||
* The analyzer outputs a v1 or v2 AWS S3 directory entity depending on the 'AWS S3 Model Version' property configured on the reporting task.
|
||||
* <p>
|
||||
* Atlas entity hierarchy v1: aws_s3_pseudo_dir -> aws_s3_bucket
|
||||
* <p>aws_s3_pseudo_dir
|
||||
* <ul>
|
||||
* <li>qualifiedName=s3a://bucket/path@namespace (example: s3a://mybucket/mydir1/mydir2@ns1)
|
||||
* <li>name=/path (example: /mydir1/mydir2)
|
||||
* </ul>
|
||||
* <p>aws_s3_bucket
|
||||
* <ul>
|
||||
* <li>qualifiedName=s3a://bucket@namespace (example: s3a://mybucket@ns1)
|
||||
* <li>name=bucket (example: mybucket)
|
||||
* </ul>
|
||||
* <p>
|
||||
* Atlas entity hierarchy v2: aws_s3_v2_directory -> aws_s3_v2_directory -> ... -> aws_s3_v2_bucket
|
||||
* <p>aws_s3_v2_directory
|
||||
* <ul>
|
||||
* <li>qualifiedName=s3a://bucket/path/@namespace (example: s3a://mybucket/mydir1/mydir2/@ns1)
|
||||
* <li>name=directory (example: mydir2)
|
||||
* </ul>
|
||||
* <p>aws_s3_v2_bucket
|
||||
* <ul>
|
||||
* <li>qualifiedName=s3a://bucket@namespace (example: s3a://mybucket@ns1)
|
||||
* <li>name=bucket (example: mybucket)
|
||||
* </ul>
|
||||
*/
|
||||
public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
|
||||
|
||||
private static final String TYPE_DIRECTORY = "aws_s3_pseudo_dir";
|
||||
private static final String TYPE_BUCKET = "aws_s3_bucket";
|
||||
public static final String TYPE_DIRECTORY_V1 = AtlasPathExtractorUtil.AWS_S3_PSEUDO_DIR;
|
||||
public static final String TYPE_BUCKET_V1 = AtlasPathExtractorUtil.AWS_S3_BUCKET;
|
||||
public static final String ATTR_BUCKET_V1 = AtlasPathExtractorUtil.ATTRIBUTE_BUCKET;
|
||||
public static final String ATTR_OBJECT_PREFIX_V1 = AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX;
|
||||
|
||||
public static final String ATTR_OBJECT_PREFIX = "objectPrefix";
|
||||
public static final String ATTR_BUCKET = "bucket";
|
||||
public static final String TYPE_DIRECTORY_V2 = AtlasPathExtractorUtil.AWS_S3_V2_PSEUDO_DIR;
|
||||
public static final String TYPE_BUCKET_V2 = AtlasPathExtractorUtil.AWS_S3_V2_BUCKET;
|
||||
public static final String ATTR_CONTAINER_V2 = AtlasPathExtractorUtil.ATTRIBUTE_CONTAINER;
|
||||
public static final String ATTR_OBJECT_PREFIX_V2 = AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX;
|
||||
|
||||
@Override
|
||||
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
|
||||
final String transitUri = event.getTransitUri();
|
||||
String transitUri = event.getTransitUri();
|
||||
if (transitUri == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String directoryUri;
|
||||
if (StringUtils.countMatches(transitUri, '/') > 3) {
|
||||
// directory exists => drop last '/' and the file name
|
||||
directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/'));
|
||||
} else {
|
||||
// no directory => keep last '/', drop only the file name
|
||||
directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1);
|
||||
}
|
||||
final URI uri = parseUri(directoryUri);
|
||||
String directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1);
|
||||
|
||||
final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
|
||||
Path path = new Path(directoryUri);
|
||||
|
||||
final Referenceable ref = createDirectoryRef(uri, namespace);
|
||||
String namespace = context.getNamespaceResolver().fromHostNames(path.toUri().getHost());
|
||||
|
||||
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
|
||||
PathExtractorContext pathExtractorContext = new PathExtractorContext(namespace, context.getAwsS3ModelVersion());
|
||||
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext);
|
||||
|
||||
Referenceable ref = convertToReferenceable(entityWithExtInfo.getEntity(), pathExtractorContext.getKnownEntities());
|
||||
|
||||
return ref != null ? singleDataSetRef(event.getComponentId(), event.getEventType(), ref) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,23 +97,34 @@ public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
|
|||
return "^s3a://.+/.+$";
|
||||
}
|
||||
|
||||
private Referenceable createDirectoryRef(URI uri, String namespace) {
|
||||
final Referenceable ref = new Referenceable(TYPE_DIRECTORY);
|
||||
private Referenceable convertToReferenceable(AtlasEntity entity, Map<String, AtlasEntity> knownEntities) {
|
||||
if (entity == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, uri.toString().toLowerCase()));
|
||||
ref.set(ATTR_NAME, uri.getPath().toLowerCase());
|
||||
ref.set(ATTR_OBJECT_PREFIX, uri.getPath().toLowerCase());
|
||||
ref.set(ATTR_BUCKET, createBucketRef(uri, namespace));
|
||||
Referenceable ref = createReferenceable(entity);
|
||||
|
||||
if (TYPE_DIRECTORY_V1.equals(entity.getTypeName())) {
|
||||
AtlasObjectId bucketObjectId = (AtlasObjectId) entity.getRelationshipAttribute(ATTR_BUCKET_V1);
|
||||
if (bucketObjectId != null) {
|
||||
AtlasEntity bucketEntity = knownEntities.get(bucketObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME));
|
||||
ref.set(ATTR_BUCKET_V1, convertToReferenceable(bucketEntity, knownEntities));
|
||||
}
|
||||
} else if (TYPE_DIRECTORY_V2.equals(entity.getTypeName())) {
|
||||
AtlasObjectId containerObjectId = (AtlasObjectId) entity.getRelationshipAttribute(ATTR_CONTAINER_V2);
|
||||
if (containerObjectId != null) {
|
||||
AtlasEntity containerEntity = knownEntities.get(containerObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME));
|
||||
ref.set(ATTR_CONTAINER_V2, convertToReferenceable(containerEntity, knownEntities));
|
||||
}
|
||||
}
|
||||
|
||||
return ref;
|
||||
}
|
||||
|
||||
private Referenceable createBucketRef(URI uri, String namespace) {
|
||||
final Referenceable ref = new Referenceable(TYPE_BUCKET);
|
||||
|
||||
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, String.format("%s://%s", uri.getScheme(), uri.getAuthority())));
|
||||
ref.set(ATTR_NAME, uri.getAuthority());
|
||||
|
||||
private Referenceable createReferenceable(AtlasEntity entity) {
|
||||
Referenceable ref = new Referenceable(entity.getTypeName());
|
||||
ref.setValues(entity.getAttributes());
|
||||
return ref;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -34,17 +34,32 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
|
|||
|
||||
/**
|
||||
* Analyze a transit URI as an Azure ADLS Gen2 directory (skipping the file name).
|
||||
* <li>qualifiedName=abfs://filesystem@account/path@namespace (example: abfs://myfilesystem@myaccount/mydir1/mydir2@ns1)
|
||||
* <li>name=directory (example: mydir2)
|
||||
* <p>
|
||||
* Atlas entity hierarchy: adls_gen2_directory -> adls_gen2_directory -> ... -> adls_gen2_container -> adls_gen2_account
|
||||
* <p>adls_gen2_directory
|
||||
* <ul>
|
||||
* <li>qualifiedName=abfs://filesystem@account/path@namespace (example: abfs://myfilesystem@myaccount/mydir1/mydir2/@ns1)
|
||||
* <li>name=directory (example: mydir2)
|
||||
* </ul>
|
||||
* <p>adls_gen2_container
|
||||
* <ul>
|
||||
* <li>qualifiedName=abfs://filesystem@account@namespace (example: abfs://myfilesystem@myaccount@ns1)
|
||||
* <li>name=filesystem (example: myfilesystem)
|
||||
* </ul>
|
||||
* <p>adls_gen2_account
|
||||
* <ul>
|
||||
* <li>qualifiedName=abfs://account@namespace (example: abfs://myaccount@ns1)
|
||||
* <li>name=account (example: myaccount)
|
||||
* </ul>
|
||||
*/
|
||||
public class AzureADLSDirectory extends AbstractNiFiProvenanceEventAnalyzer {
|
||||
|
||||
public static final String TYPE_DIRECTORY = "adls_gen2_directory";
|
||||
public static final String TYPE_CONTAINER = "adls_gen2_container";
|
||||
public static final String TYPE_ACCOUNT = "adls_gen2_account";
|
||||
public static final String TYPE_DIRECTORY = AtlasPathExtractorUtil.ADLS_GEN2_DIRECTORY;
|
||||
public static final String TYPE_CONTAINER = AtlasPathExtractorUtil.ADLS_GEN2_CONTAINER;
|
||||
public static final String TYPE_ACCOUNT = AtlasPathExtractorUtil.ADLS_GEN2_ACCOUNT;
|
||||
|
||||
public static final String ATTR_PARENT = "parent";
|
||||
public static final String ATTR_ACCOUNT = "account";
|
||||
public static final String ATTR_PARENT = AtlasPathExtractorUtil.ATTRIBUTE_PARENT;
|
||||
public static final String ATTR_ACCOUNT = AtlasPathExtractorUtil.ATTRIBUTE_ACCOUNT;
|
||||
|
||||
@Override
|
||||
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties;
|
|||
import org.apache.atlas.AtlasServiceException;
|
||||
import org.apache.atlas.hook.AtlasHook;
|
||||
import org.apache.atlas.security.SecurityProperties;
|
||||
import org.apache.atlas.utils.AtlasPathExtractorUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.alias.CredentialProvider;
|
||||
|
@ -325,6 +326,22 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
|
|||
.defaultValue(LINEAGE_STRATEGY_SIMPLE_PATH.getValue())
|
||||
.build();
|
||||
|
||||
static final AllowableValue AWS_S3_MODEL_VERSION_V1 = new AllowableValue("v1", "v1",
|
||||
"Creates AWS S3 directory entities version 1 (aws_s3_pseudo_dir).");
|
||||
static final AllowableValue AWS_S3_MODEL_VERSION_V2 = new AllowableValue(AtlasPathExtractorUtil.AWS_S3_ATLAS_MODEL_VERSION_V2, "v2",
|
||||
"Creates AWS S3 directory entities version 2 (aws_s3_v2_directory).");
|
||||
|
||||
static final PropertyDescriptor AWS_S3_MODEL_VERSION = new PropertyDescriptor.Builder()
|
||||
.name("aws-s3-model-version")
|
||||
.displayName("AWS S3 Model Version")
|
||||
.description("Specifies what type of AWS S3 directory entities will be created in Atlas for s3a:// transit URIs (eg. PutHDFS with S3 integration)." +
|
||||
" NOTE: It is strongly recommended to keep using the same AWS S3 entity model version once this reporting task started to keep Atlas data clean." +
|
||||
" Switching versions will not delete existing Atlas entities created by the old version, nor migrate them to the new version.")
|
||||
.required(true)
|
||||
.allowableValues(AWS_S3_MODEL_VERSION_V1, AWS_S3_MODEL_VERSION_V2)
|
||||
.defaultValue(AWS_S3_MODEL_VERSION_V2.getValue())
|
||||
.build();
|
||||
|
||||
private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
|
||||
private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs";
|
||||
private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs";
|
||||
|
@ -387,6 +404,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
|
|||
properties.add(ATLAS_CONNECT_TIMEOUT);
|
||||
properties.add(ATLAS_READ_TIMEOUT);
|
||||
|
||||
// Provenance event analyzer specific properties
|
||||
properties.add(AWS_S3_MODEL_VERSION);
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
@ -848,9 +868,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
|
|||
|
||||
private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) {
|
||||
final EventAccess eventAccess = context.getEventAccess();
|
||||
final String awsS3ModelVersion = context.getProperty(AWS_S3_MODEL_VERSION).getValue();
|
||||
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, namespaceResolvers,
|
||||
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
|
||||
(ProvenanceRepository)eventAccess.getProvenanceRepository());
|
||||
(ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion);
|
||||
consumer.consumeEvents(context, (componentMapHolder, events) -> {
|
||||
for (ProvenanceEventRecord event : events) {
|
||||
try {
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.atlas.provenance.analyzer;
|
||||
|
||||
import org.apache.atlas.v1.model.instance.Referenceable;
|
||||
import org.apache.nifi.atlas.provenance.AnalysisContext;
|
||||
import org.apache.nifi.atlas.provenance.DataSetRefs;
|
||||
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
|
||||
|
@ -24,56 +23,25 @@ import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
|
|||
import org.apache.nifi.atlas.resolver.NamespaceResolver;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestAwsS3Directory {
|
||||
public abstract class AbstractTestAwsS3Directory {
|
||||
|
||||
private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND;
|
||||
private static final String ATLAS_NAMESPACE = "namespace1";
|
||||
private static final String AWS_BUCKET = "bucket1";
|
||||
private static final String AWS_FILENAME = "file1";
|
||||
protected static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND;
|
||||
protected static final String ATLAS_NAMESPACE = "namespace1";
|
||||
protected static final String AWS_BUCKET = "bucket1";
|
||||
protected static final String AWS_FILENAME = "file1";
|
||||
|
||||
@Test
|
||||
public void testSimpleDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String directory = "/dir1";
|
||||
protected abstract String getAwsS3ModelVersion();
|
||||
|
||||
executeTest(processorName, directory);
|
||||
}
|
||||
protected abstract void assertAnalysisResult(DataSetRefs refs, String directory);
|
||||
|
||||
@Test
|
||||
public void testCompoundDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String directory = "/dir1/dir2/dir3/dir4/dir5";
|
||||
|
||||
executeTest(processorName, directory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRootDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String directory = "/";
|
||||
|
||||
executeTest(processorName, directory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithPutORC() {
|
||||
String processorName = "PutORC";
|
||||
String directory = "/dir1";
|
||||
|
||||
executeTest(processorName, directory);
|
||||
}
|
||||
|
||||
public void executeTest(String processorName, String directory) {
|
||||
protected void executeTest(String processorName, String directory) {
|
||||
String transitUri = createTransitUri(directory);
|
||||
|
||||
ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri);
|
||||
|
@ -110,6 +78,7 @@ public class TestAwsS3Directory {
|
|||
|
||||
AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class);
|
||||
when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver);
|
||||
when(analysisContext.getAwsS3ModelVersion()).thenReturn(getAwsS3ModelVersion());
|
||||
|
||||
return analysisContext;
|
||||
}
|
||||
|
@ -118,25 +87,4 @@ public class TestAwsS3Directory {
|
|||
assertNotNull(analyzer);
|
||||
assertEquals(AwsS3Directory.class, analyzer.getClass());
|
||||
}
|
||||
|
||||
private void assertAnalysisResult(DataSetRefs refs, String directory) {
|
||||
String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", AWS_BUCKET, directory, ATLAS_NAMESPACE);
|
||||
String expectedBucketQualifiedName = String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE);
|
||||
|
||||
assertEquals(0, refs.getInputs().size());
|
||||
assertEquals(1, refs.getOutputs().size());
|
||||
|
||||
Referenceable directoryRef = refs.getOutputs().iterator().next();
|
||||
|
||||
assertEquals("aws_s3_pseudo_dir", directoryRef.getTypeName());
|
||||
assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals(directory, directoryRef.get(ATTR_NAME));
|
||||
assertEquals(directory, directoryRef.get("objectPrefix"));
|
||||
|
||||
Referenceable bucketRef = (Referenceable) directoryRef.get("bucket");
|
||||
assertNotNull(bucketRef);
|
||||
assertEquals("aws_s3_bucket", bucketRef.getTypeName());
|
||||
assertEquals(expectedBucketQualifiedName, bucketRef.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.atlas.provenance.analyzer;
|
||||
|
||||
import org.apache.atlas.v1.model.instance.Referenceable;
|
||||
import org.apache.nifi.atlas.provenance.DataSetRefs;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
|
||||
import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_BUCKET_V1;
|
||||
import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_OBJECT_PREFIX_V1;
|
||||
import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_BUCKET_V1;
|
||||
import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_DIRECTORY_V1;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class TestAwsS3DirectoryV1 extends AbstractTestAwsS3Directory {
|
||||
|
||||
@Override
|
||||
protected String getAwsS3ModelVersion() {
|
||||
return "v1";
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String dirPath = "/dir1";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompoundDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String dirPath = "/dir1/dir2/dir3/dir4/dir5";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRootDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String dirPath = "/";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithPutORC() {
|
||||
String processorName = "PutORC";
|
||||
String dirPath = "/dir1";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertAnalysisResult(DataSetRefs refs, String dirPath) {
|
||||
String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", AWS_BUCKET, dirPath, ATLAS_NAMESPACE);
|
||||
String expectedBucketQualifiedName = String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE);
|
||||
|
||||
assertEquals(0, refs.getInputs().size());
|
||||
assertEquals(1, refs.getOutputs().size());
|
||||
|
||||
Referenceable directoryRef = refs.getOutputs().iterator().next();
|
||||
|
||||
assertEquals(TYPE_DIRECTORY_V1, directoryRef.getTypeName());
|
||||
assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals(dirPath, directoryRef.get(ATTR_NAME));
|
||||
assertEquals(dirPath, directoryRef.get(ATTR_OBJECT_PREFIX_V1));
|
||||
|
||||
Referenceable bucketRef = (Referenceable) directoryRef.get(ATTR_BUCKET_V1);
|
||||
assertNotNull(bucketRef);
|
||||
assertEquals(TYPE_BUCKET_V1, bucketRef.getTypeName());
|
||||
assertEquals(expectedBucketQualifiedName, bucketRef.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.atlas.provenance.analyzer;
|
||||
|
||||
import org.apache.atlas.utils.AtlasPathExtractorUtil;
|
||||
import org.apache.atlas.v1.model.instance.Referenceable;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.atlas.provenance.DataSetRefs;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
|
||||
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
|
||||
import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_CONTAINER_V2;
|
||||
import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_OBJECT_PREFIX_V2;
|
||||
import static org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_BUCKET_V2;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class TestAwsS3DirectoryV2 extends AbstractTestAwsS3Directory {
|
||||
|
||||
@Override
|
||||
protected String getAwsS3ModelVersion() {
|
||||
return AtlasPathExtractorUtil.AWS_S3_ATLAS_MODEL_VERSION_V2;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String dirPath = "/dir1";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompoundDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String dirPath = "/dir1/dir2/dir3/dir4/dir5";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRootDirectory() {
|
||||
String processorName = "PutHDFS";
|
||||
String dirPath = "/";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithPutORC() {
|
||||
String processorName = "PutORC";
|
||||
String dirPath = "/dir1";
|
||||
|
||||
executeTest(processorName, dirPath);
|
||||
}
|
||||
|
||||
protected void assertAnalysisResult(DataSetRefs refs, String dirPath) {
|
||||
assertEquals(0, refs.getInputs().size());
|
||||
assertEquals(1, refs.getOutputs().size());
|
||||
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
|
||||
String actualPath = dirPath;
|
||||
while (StringUtils.isNotEmpty(actualPath) && !"/".equals(actualPath)) {
|
||||
String directory = StringUtils.substringAfterLast(actualPath, "/");
|
||||
|
||||
assertEquals(AwsS3Directory.TYPE_DIRECTORY_V2, ref.getTypeName());
|
||||
assertEquals(String.format("s3a://%s%s/@%s", AWS_BUCKET, actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals(directory, ref.get(ATTR_NAME));
|
||||
assertEquals(actualPath + "/", ref.get(ATTR_OBJECT_PREFIX_V2));
|
||||
assertNotNull(ref.get(ATTR_CONTAINER_V2));
|
||||
|
||||
ref = (Referenceable) ref.get(ATTR_CONTAINER_V2);
|
||||
actualPath = StringUtils.substringBeforeLast(actualPath, "/");
|
||||
}
|
||||
|
||||
assertEquals(TYPE_BUCKET_V2, ref.getTypeName());
|
||||
assertEquals(String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals(AWS_BUCKET, ref.get(ATTR_NAME));
|
||||
}
|
||||
}
|
|
@ -80,7 +80,7 @@ public class TestAzureADLSDirectory {
|
|||
executeTest(processorName, path);
|
||||
}
|
||||
|
||||
public void executeTest(String processorName, String path) {
|
||||
private void executeTest(String processorName, String path) {
|
||||
String transitUri = String.format("abfs://%s@%s.dfs.core.windows.net%s/%s", ADLS_FILESYSTEM, ADLS_ACCOUNT, path, ADLS_FILENAME);
|
||||
|
||||
ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri);
|
||||
|
|
Loading…
Reference in New Issue