mirror of https://github.com/apache/nifi.git
NIFI-7422: Support aws_s3_pseudo_dir in Atlas reporting task
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4292.
This commit is contained in:
parent
a1f277fdf7
commit
06864c830c
|
@ -0,0 +1,93 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.atlas.provenance.analyzer;
|
||||||
|
|
||||||
|
import org.apache.atlas.v1.model.instance.Referenceable;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
|
||||||
|
import org.apache.nifi.atlas.provenance.AnalysisContext;
|
||||||
|
import org.apache.nifi.atlas.provenance.DataSetRefs;
|
||||||
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
|
||||||
|
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
|
||||||
|
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Analyze a transit URI as an AWS S3 directory (skipping the object name).
|
||||||
|
* <li>qualifiedName=s3a://bucket/path@namespace (example: s3a://mybucket/mydir@ns1)
|
||||||
|
* <li>name=/path (example: /mydir)
|
||||||
|
*/
|
||||||
|
public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
|
||||||
|
|
||||||
|
private static final String TYPE_DIRECTORY = "aws_s3_pseudo_dir";
|
||||||
|
private static final String TYPE_BUCKET = "aws_s3_bucket";
|
||||||
|
|
||||||
|
public static final String ATTR_OBJECT_PREFIX = "objectPrefix";
|
||||||
|
public static final String ATTR_BUCKET = "bucket";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
|
||||||
|
final String transitUri = event.getTransitUri();
|
||||||
|
if (transitUri == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String directoryUri;
|
||||||
|
if (StringUtils.countMatches(transitUri, '/') > 3) {
|
||||||
|
// directory exists => drop last '/' and the file name
|
||||||
|
directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/'));
|
||||||
|
} else {
|
||||||
|
// no directory => keep last '/', drop only the file name
|
||||||
|
directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1);
|
||||||
|
}
|
||||||
|
final URI uri = parseUri(directoryUri);
|
||||||
|
|
||||||
|
final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
|
||||||
|
|
||||||
|
final Referenceable ref = createDirectoryRef(uri, namespace);
|
||||||
|
|
||||||
|
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String targetTransitUriPattern() {
|
||||||
|
return "^s3a://.+/.+$";
|
||||||
|
}
|
||||||
|
|
||||||
|
private Referenceable createDirectoryRef(URI uri, String namespace) {
|
||||||
|
final Referenceable ref = new Referenceable(TYPE_DIRECTORY);
|
||||||
|
|
||||||
|
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, uri.toString().toLowerCase()));
|
||||||
|
ref.set(ATTR_NAME, uri.getPath().toLowerCase());
|
||||||
|
ref.set(ATTR_OBJECT_PREFIX, uri.getPath().toLowerCase());
|
||||||
|
ref.set(ATTR_BUCKET, createBucketRef(uri, namespace));
|
||||||
|
|
||||||
|
return ref;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Referenceable createBucketRef(URI uri, String namespace) {
|
||||||
|
final Referenceable ref = new Referenceable(TYPE_BUCKET);
|
||||||
|
|
||||||
|
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, String.format("%s://%s", uri.getScheme(), uri.getAuthority())));
|
||||||
|
ref.set(ATTR_NAME, uri.getAuthority());
|
||||||
|
|
||||||
|
return ref;
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ org.apache.nifi.atlas.provenance.analyzer.Hive2JDBC
|
||||||
org.apache.nifi.atlas.provenance.analyzer.HDFSPath
|
org.apache.nifi.atlas.provenance.analyzer.HDFSPath
|
||||||
org.apache.nifi.atlas.provenance.analyzer.HBaseTable
|
org.apache.nifi.atlas.provenance.analyzer.HBaseTable
|
||||||
org.apache.nifi.atlas.provenance.analyzer.FilePath
|
org.apache.nifi.atlas.provenance.analyzer.FilePath
|
||||||
|
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory
|
||||||
|
|
||||||
# By event type, if none of above analyzers matches
|
# By event type, if none of above analyzers matches
|
||||||
org.apache.nifi.atlas.provenance.analyzer.unknown.Create
|
org.apache.nifi.atlas.provenance.analyzer.unknown.Create
|
||||||
|
|
|
@ -471,8 +471,9 @@ remote target port
|
||||||
FetchHDFS<br/>
|
FetchHDFS<br/>
|
||||||
FetchParquet<br/>
|
FetchParquet<br/>
|
||||||
GetHDFS<br/>
|
GetHDFS<br/>
|
||||||
GetHDFSSequenceFIle<br/>
|
GetHDFSSequenceFile<br/>
|
||||||
PutHDFS<br/>
|
PutHDFS<br/>
|
||||||
|
PutORC<br/>
|
||||||
PutParquet<br/>
|
PutParquet<br/>
|
||||||
</td>
|
</td>
|
||||||
<td>
|
<td>
|
||||||
|
@ -483,12 +484,40 @@ remote target port
|
||||||
RECEIVE<br/>
|
RECEIVE<br/>
|
||||||
SEND<br/>
|
SEND<br/>
|
||||||
SEND<br/>
|
SEND<br/>
|
||||||
|
SEND<br/>
|
||||||
</td>
|
</td>
|
||||||
<td>hdfs://nn.example.com:8020/user/nifi/5262553828219</td>
|
<td>hdfs://nn.example.com:8020/user/nifi/5262553828219</td>
|
||||||
<td>hdfs_path</td>
|
<td>hdfs_path</td>
|
||||||
<td>/path/fileName@namespace<br/>(e.g. /app/warehouse/hive/db/default@ns1)</td>
|
<td>/path/fileName@namespace<br/>(e.g. /app/warehouse/hive/db/default@ns1)</td>
|
||||||
<td></td>
|
<td></td>
|
||||||
</tr>
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>AwsS3Directory</td>
|
||||||
|
<td>
|
||||||
|
DeleteHDFS<br/>
|
||||||
|
FetchHDFS<br/>
|
||||||
|
FetchParquet<br/>
|
||||||
|
GetHDFS<br/>
|
||||||
|
GetHDFSSequenceFile<br/>
|
||||||
|
PutHDFS<br/>
|
||||||
|
PutORC<br/>
|
||||||
|
PutParquet<br/>
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
REMOTE_INVOCATION<br/>
|
||||||
|
FETCH<br/>
|
||||||
|
FETCH<br/>
|
||||||
|
RECEIVE<br/>
|
||||||
|
RECEIVE<br/>
|
||||||
|
SEND<br/>
|
||||||
|
SEND<br/>
|
||||||
|
SEND<br/>
|
||||||
|
</td>
|
||||||
|
<td>s3a://mybucket/mydir</td>
|
||||||
|
<td>aws_s3_pseudo_dir</td>
|
||||||
|
<td>s3UrlWithoutObjectName@namespace<br/>(e.g. s3a://mybucket/mydir@ns1)</td>
|
||||||
|
<td></td>
|
||||||
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>HBaseTable</td>
|
<td>HBaseTable</td>
|
||||||
<td>
|
<td>
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.atlas.provenance.analyzer;
|
||||||
|
|
||||||
|
import org.apache.atlas.v1.model.instance.Referenceable;
|
||||||
|
import org.apache.nifi.atlas.provenance.AnalysisContext;
|
||||||
|
import org.apache.nifi.atlas.provenance.DataSetRefs;
|
||||||
|
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
|
||||||
|
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
|
||||||
|
import org.apache.nifi.atlas.resolver.NamespaceResolver;
|
||||||
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
|
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
|
||||||
|
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestAwsS3Directory {
|
||||||
|
|
||||||
|
private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND;
|
||||||
|
private static final String ATLAS_NAMESPACE = "namespace1";
|
||||||
|
private static final String AWS_BUCKET = "bucket1";
|
||||||
|
private static final String AWS_FILENAME = "file1";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleDirectory() {
|
||||||
|
String processorName = "PutHDFS";
|
||||||
|
String directory = "/dir1";
|
||||||
|
|
||||||
|
executeTest(processorName, directory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompoundDirectory() {
|
||||||
|
String processorName = "PutHDFS";
|
||||||
|
String directory = "/dir1/dir2/dir3/dir4/dir5";
|
||||||
|
|
||||||
|
executeTest(processorName, directory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRootDirectory() {
|
||||||
|
String processorName = "PutHDFS";
|
||||||
|
String directory = "/";
|
||||||
|
|
||||||
|
executeTest(processorName, directory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithPutORC() {
|
||||||
|
String processorName = "PutORC";
|
||||||
|
String directory = "/dir1";
|
||||||
|
|
||||||
|
executeTest(processorName, directory);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void executeTest(String processorName, String directory) {
|
||||||
|
String transitUri = createTransitUri(directory);
|
||||||
|
|
||||||
|
ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri);
|
||||||
|
AnalysisContext analysisContext = mockAnalysisContext();
|
||||||
|
|
||||||
|
NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, PROVENANCE_EVENT_TYPE);
|
||||||
|
assertAnalyzer(analyzer);
|
||||||
|
|
||||||
|
DataSetRefs refs = analyzer.analyze(analysisContext, provenanceEvent);
|
||||||
|
assertAnalysisResult(refs, directory);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createTransitUri(String directory) {
|
||||||
|
if (directory.equals("/")) {
|
||||||
|
return String.format("s3a://%s/%s", AWS_BUCKET, AWS_FILENAME);
|
||||||
|
} else {
|
||||||
|
return String.format("s3a://%s%s/%s", AWS_BUCKET, directory, AWS_FILENAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ProvenanceEventRecord mockProvenanceEvent(String processorName, String transitUri) {
|
||||||
|
ProvenanceEventRecord provenanceEvent = Mockito.mock(ProvenanceEventRecord.class);
|
||||||
|
|
||||||
|
when(provenanceEvent.getComponentType()).thenReturn(processorName);
|
||||||
|
when(provenanceEvent.getTransitUri()).thenReturn(transitUri);
|
||||||
|
when(provenanceEvent.getEventType()).thenReturn(PROVENANCE_EVENT_TYPE);
|
||||||
|
|
||||||
|
return provenanceEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AnalysisContext mockAnalysisContext() {
|
||||||
|
NamespaceResolver namespaceResolver = Mockito.mock(NamespaceResolver.class);
|
||||||
|
when(namespaceResolver.fromHostNames(any())).thenReturn(ATLAS_NAMESPACE);
|
||||||
|
|
||||||
|
AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class);
|
||||||
|
when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver);
|
||||||
|
|
||||||
|
return analysisContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAnalyzer(NiFiProvenanceEventAnalyzer analyzer) {
|
||||||
|
assertNotNull(analyzer);
|
||||||
|
assertEquals(AwsS3Directory.class, analyzer.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAnalysisResult(DataSetRefs refs, String directory) {
|
||||||
|
String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", AWS_BUCKET, directory, ATLAS_NAMESPACE);
|
||||||
|
String expectedBucketQualifiedName = String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE);
|
||||||
|
|
||||||
|
assertEquals(0, refs.getInputs().size());
|
||||||
|
assertEquals(1, refs.getOutputs().size());
|
||||||
|
|
||||||
|
Referenceable directoryRef = refs.getOutputs().iterator().next();
|
||||||
|
|
||||||
|
assertEquals("aws_s3_pseudo_dir", directoryRef.getTypeName());
|
||||||
|
assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME));
|
||||||
|
assertEquals(directory, directoryRef.get(ATTR_NAME));
|
||||||
|
assertEquals(directory, directoryRef.get("objectPrefix"));
|
||||||
|
|
||||||
|
Referenceable bucketRef = (Referenceable) directoryRef.get("bucket");
|
||||||
|
assertNotNull(bucketRef);
|
||||||
|
assertEquals("aws_s3_bucket", bucketRef.getTypeName());
|
||||||
|
assertEquals(expectedBucketQualifiedName, bucketRef.get(ATTR_QUALIFIED_NAME));
|
||||||
|
assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue