NIFI-8030: Added property to ReportLineageToAtlas to configure file/directory level logging of [hd]fs_path entities to Atlas

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4813.
This commit is contained in:
Peter Turcsanyi 2021-02-08 20:29:10 +01:00 committed by Pierre Villard
parent 34fc94454f
commit 93b1a05dc3
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
9 changed files with 255 additions and 21 deletions

View File

@ -32,4 +32,5 @@ public interface AnalysisContext {
ComputeLineageResult findParents(long eventId);
ProvenanceEventRecord getProvenanceEvent(long eventId);
String getAwsS3ModelVersion();
FilesystemPathsLevel getFilesystemPathsLevel();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.provenance;
public enum FilesystemPathsLevel {
FILE("File"),
DIRECTORY("Directory");
private final String displayName;
FilesystemPathsLevel(String displayName) {
this.displayName = displayName;
}
public String getDisplayName() {
return displayName;
}
}

View File

@ -37,13 +37,15 @@ public class StandardAnalysisContext implements AnalysisContext {
private final NamespaceResolver namespaceResolver;
private final ProvenanceRepository provenanceRepository;
private final String awsS3ModelVersion;
private final FilesystemPathsLevel filesystemPathsLevel;
public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver namespaceResolver,
ProvenanceRepository provenanceRepository, String awsS3ModelVersion) {
ProvenanceRepository provenanceRepository, String awsS3ModelVersion, FilesystemPathsLevel filesystemPathsLevel) {
this.nifiFlow = nifiFlow;
this.namespaceResolver = namespaceResolver;
this.provenanceRepository = provenanceRepository;
this.awsS3ModelVersion = awsS3ModelVersion;
this.filesystemPathsLevel = filesystemPathsLevel;
}
@Override
@ -107,4 +109,9 @@ public class StandardAnalysisContext implements AnalysisContext {
public String getAwsS3ModelVersion() {
return awsS3ModelVersion;
}
@Override
public FilesystemPathsLevel getFilesystemPathsLevel() {
return filesystemPathsLevel;
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import java.net.URI;
public abstract class AbstractFileSystemPathAnalyzer extends AbstractNiFiProvenanceEventAnalyzer {
private static final String PATH_SEPARATOR = "/";
public String getPath(AnalysisContext context, URI uri) {
String path;
if (context.getFilesystemPathsLevel() == FilesystemPathsLevel.DIRECTORY) {
final String dirPath = StringUtils.substringBeforeLast(uri.getPath(), PATH_SEPARATOR);
path = dirPath.isEmpty() ? PATH_SEPARATOR : dirPath;
} else {
path = uri.getPath();
}
return path;
}
}

View File

@ -17,11 +17,10 @@
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,13 +34,13 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
/**
* Analyze a transit URI as a file system path.
* <li>qualifiedName=/path/fileName@hostname (example: /tmp/dir/filename.txt@host.example.com)
* <li>name=/path/fileName (example: /tmp/dir/filename.txt)
* Analyze a transit URI as a file system path. Return file or directory path depending on FilesystemPathsLevel setting.
* <li>qualifiedName=/path[/fileName]@namespace (example: /tmp/dir[/filename.txt]@ns1)
* <li>name=/path[/fileName] (example: /tmp/dir[/filename.txt])
*/
public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
public class FilePath extends AbstractFileSystemPathAnalyzer {
private static final Logger logger = LoggerFactory.getLogger(FilePath.class);
private static final Logger LOGGER = LoggerFactory.getLogger(FilePath.class);
private static final String TYPE = "fs_path";
@ -56,11 +55,12 @@ public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
final String hostname = StringUtils.isEmpty(uriHost) ? InetAddress.getLocalHost().getHostName() : uriHost;
namespace = context.getNamespaceResolver().fromHostNames(hostname);
} catch (UnknownHostException e) {
logger.warn("Failed to get localhost name due to " + e, e);
LOGGER.warn("Failed to get localhost name due to " + e, e);
return null;
}
final String path = uri.getPath();
final String path = getPath(context, uri);
ref.set(ATTR_NAME, path);
ref.set(ATTR_PATH, path);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path));

View File

@ -17,7 +17,6 @@
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -31,11 +30,11 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
/**
* Analyze a transit URI as a HDFS path.
* <li>qualifiedName=/path/fileName@namespace (example: /app/warehouse/hive/db/default@ns1)
* <li>name=/path/fileName (example: /app/warehouse/hive/db/default)
* Analyze a transit URI as a HDFS path. Return file or directory path depending on FilesystemPathsLevel setting.
* <li>qualifiedName=/path[/fileName]@namespace (example: /app/warehouse/hive/db/default[/datafile]@ns1)
* <li>name=/path[/fileName] (example: /app/warehouse/hive/db/default[/datafile])
*/
public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
public class HDFSPath extends AbstractFileSystemPathAnalyzer {
private static final String TYPE = "hdfs_path";
@ -44,7 +43,9 @@ public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
final Referenceable ref = new Referenceable(TYPE);
final URI uri = parseUri(event.getTransitUri());
final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
final String path = uri.getPath();
final String path = getPath(context, uri);
ref.set(ATTR_NAME, path);
ref.set(ATTR_PATH, path);
// The attribute 'clusterName' is in the 'hdfs_path' Atlas entity so it cannot be changed.

View File

@ -41,6 +41,7 @@ import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
@ -343,6 +344,23 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.defaultValue(AWS_S3_MODEL_VERSION_V2.getValue())
.build();
static final AllowableValue FILESYSTEM_PATHS_LEVEL_FILE = new AllowableValue(FilesystemPathsLevel.FILE.name(), FilesystemPathsLevel.FILE.getDisplayName(),
"Creates File level paths.");
static final AllowableValue FILESYSTEM_PATHS_LEVEL_DIRECTORY = new AllowableValue(FilesystemPathsLevel.DIRECTORY.name(), FilesystemPathsLevel.DIRECTORY.getDisplayName(),
"Creates Directory level paths.");
static final PropertyDescriptor FILESYSTEM_PATHS_LEVEL = new PropertyDescriptor.Builder()
.name("filesystem-paths-level")
.displayName("Filesystem Path Entities Level")
.description("Specifies how the filesystem path entities (fs_path and hdfs_path) will be logged in Atlas: File or Directory level. In case of File level, each individual file entity " +
"will be sent to Atlas as a separate entity with the full path including the filename. Directory level only logs the path of the parent directory without the filename. " +
"This setting affects processors working with files, like GetFile or PutHDFS. NOTE: Although the default value is File level for backward compatibility reasons, " +
"it is highly recommended to set it to Directory level because File level logging can generate a huge number of entities in Atlas.")
.required(true)
.allowableValues(FILESYSTEM_PATHS_LEVEL_FILE, FILESYSTEM_PATHS_LEVEL_DIRECTORY)
.defaultValue(FILESYSTEM_PATHS_LEVEL_FILE.getValue())
.build();
private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs";
private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs";
@ -405,6 +423,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// Provenance event analyzer specific properties
properties.add(AWS_S3_MODEL_VERSION);
properties.add(FILESYSTEM_PATHS_LEVEL);
return properties;
}
@ -868,9 +887,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) {
final EventAccess eventAccess = context.getEventAccess();
final String awsS3ModelVersion = context.getProperty(AWS_S3_MODEL_VERSION).getValue();
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.valueOf(context.getProperty(FILESYSTEM_PATHS_LEVEL).getValue());
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, namespaceResolvers,
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
(ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion);
(ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion, filesystemPathsLevel);
consumer.consumeEvents(context, (componentMapHolder, events) -> {
for (ProvenanceEventRecord event : events) {
try {

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
public class TestFilePath {
@Test
public void testFilePathWithFileLevel() {
final String transitUri = "file:/user/nifi/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
final String expectedPath = "/user/nifi/fileA";
testFilePath(transitUri, filesystemPathsLevel, expectedPath);
}
@Test
public void testFilePathWithDirectoryLevel() {
final String transitUri = "file:/user/nifi/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
final String expectedPath = "/user/nifi";
testFilePath(transitUri, filesystemPathsLevel, expectedPath);
}
@Test
public void testFilePathRootDirWithFileLevel() {
final String transitUri = "file:/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
final String expectedPath = "/fileA";
testFilePath(transitUri, filesystemPathsLevel, expectedPath);
}
@Test
public void testFilePathRootDirWithDirectoryLevel() {
final String transitUri = "file:/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
final String expectedPath = "/";
testFilePath(transitUri, filesystemPathsLevel, expectedPath);
}
private void testFilePath(String transitUri, FilesystemPathsLevel filesystemPathsLevel, String expectedPath) {
final String processorName = "PutFile";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(anyString())).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.getFilesystemPathsLevel()).thenReturn(filesystemPathsLevel);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
assertEquals(FilePath.class, analyzer.getClass());
final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(0, refs.getInputs().size());
assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("fs_path", ref.getTypeName());
assertEquals(expectedPath, ref.get(ATTR_NAME));
assertEquals(expectedPath + "@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
@ -37,10 +38,40 @@ import static org.mockito.Mockito.when;
public class TestHDFSPath {
@Test
public void testHDFSPath() {
final String processorName = "PutHDFS";
public void testHDFSPathWithFileLevel() {
// TODO: what if with HA namenode?
final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
final String expectedPath = "/user/nifi/fileA";
testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
}
@Test
public void testHDFSPathWithDirectoryLevel() {
final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
final String expectedPath = "/user/nifi";
testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
}
@Test
public void testHDFSPathRootDirWithFileLevel() {
final String transitUri = "hdfs://0.example.com:8020/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
final String expectedPath = "/fileA";
testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
}
@Test
public void testHDFSPathRootDirWithDirectoryLevel() {
final String transitUri = "hdfs://0.example.com:8020/fileA";
final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
final String expectedPath = "/";
testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
}
private void testHDFSPath(String transitUri, FilesystemPathsLevel filesystemPathsLevel, String expectedPath) {
final String processorName = "PutHDFS";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
@ -51,16 +82,18 @@ public class TestHDFSPath {
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.getFilesystemPathsLevel()).thenReturn(filesystemPathsLevel);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
assertEquals(HDFSPath.class, analyzer.getClass());
final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(0, refs.getInputs().size());
assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hdfs_path", ref.getTypeName());
assertEquals("/user/nifi/fileA", ref.get(ATTR_NAME));
assertEquals("/user/nifi/fileA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals(expectedPath, ref.get(ATTR_NAME));
assertEquals(expectedPath + "@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}