NIFI-7280 ReportLineageToAtlas recognizes 'atlas.metadata.namespace' from Atlas config file.

Still recognizes 'atlas.cluster.name' as well, but takes lower precedence than the new property.
Also Atlas URL can be provided via the 'atlas.rest.address' property in the atlas-application.properties.

NIFI-7280 In ReportLineageToAtlas improved documentation and adjusted property ordering for better user experience. Minor refactor.
NIFI-7280 In ReportLineageToAtlas amended documentation. Minor refactor.
NIFI-7280 In ReportLineageToAtlas amended more documentation. More minor refactor.
NIFI-7280 - In Atlas reporting: complete clusterName -> namespace overhaul where appropriate.

This closes #4213.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2020-04-15 13:19:40 +02:00 committed by Peter Turcsanyi
parent c19db9d623
commit e2716a6c94
44 changed files with 856 additions and 420 deletions

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
/**
* Fluent api for checking one or more strings and selecting the first non-empty one.
* <br/><br/>
* {@link #toString()} returns the first encountered non-empty string or "".
* <p>
* Optimized so that no intermediary objects are created, only one, once the first non-empty string is found.
*/
public interface StringSelector {
/**
* Starts the fluent expression by checking the first string(s).
*
* @param strings The first string(s) to check if empty.
* @return a {@link StringSelector} that checked the first string.
*/
static StringSelector of(String... strings) {
return EMPTY_STRING_SELECTOR.or(strings);
}
/**
* Check the next string(s).
*
* @param strings The next string(s) to check if empty.
* @return a {@link StringSelector} that checked all strings so far.
*/
StringSelector or(String... strings);
/**
* May be used to stop processing subsequent inputs when a result is already available.
*
* @return true if a non-empty string has been found, false otherwise.
*/
boolean found();
StringSelector EMPTY_STRING_SELECTOR = new StringSelector() {
@Override
public String toString() {
return "";
}
@Override
public StringSelector or(String... strings) {
for (String string : strings) {
if (string != null && string.length() > 0) {
return new StringSelector() {
@Override
public StringSelector or(String... string) {
return this;
}
@Override
public String toString() {
return string;
}
@Override
public boolean found() {
return true;
}
};
}
}
return EMPTY_STRING_SELECTOR;
}
@Override
public boolean found() {
return false;
}
};
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class StringSelectorTest {
@Test
public void testNull() {
test("", false, (String) null);
}
@Test
public void testEmpty() {
test("", false, "");
}
@Test
public void testNull_Empty() {
test("", false, null, "");
}
@Test
public void testNull_Empty_NonEmpty() {
test("expected", true, null, "", "expected");
}
@Test
public void testNonEmpty_Null_NonEmpty() {
test("expected", true, "expected", null, "other");
}
@Test
public void testNonEmpty_Empty_NonEmpty() {
test("expected", true, "expected", "", "other");
}
@Test
public void testTwoNonEmpty() {
test("expected", true, "expected", "other");
}
@Test
public void testManyInputsWithNoExpected() {
test(
"",
false,
new String[]{null, "", "", ""},
new String[]{null, null, ""},
new String[]{null, "", null}
);
}
@Test
public void testManyInputsWithExpectedInFirstBatch() {
test(
"expected",
true,
new String[]{null, "expected", "", ""},
new String[]{null, null, ""},
new String[]{null, "other", "andAnother"}
);
}
@Test
public void testManyInputsWithExpectedInLaterBatch() {
test(
"expected",
true,
new String[]{null, "", "", ""},
new String[]{null, null, "expected"},
new String[]{null, "other", "andAnother"}
);
}
public void test(String expected, boolean expectedFound, String... inputs) {
// GIVEN
// WHEN
StringSelector selector = StringSelector.of(inputs);
// THEN
boolean actualFound = selector.found();
String actual = selector.toString();
assertEquals(expected, actual);
assertEquals(expectedFound, actualFound);
}
public void test(String expected, boolean expectedFound, String[] firstInputs, String[]... otherInputs) {
// GIVEN
// WHEN
StringSelector selector = StringSelector.of(firstInputs);
for (String[] otherInput : otherInputs) {
selector = selector.or(otherInput);
if (selector.found()) {
assertEquals(expected, selector.toString());
} else {
assertEquals("", selector.toString());
}
}
// THEN
boolean actualFound = selector.found();
String actual = selector.toString();
assertEquals(expected, actual);
assertEquals(expectedFound, actualFound);
}
}

View File

@ -36,15 +36,15 @@ public class AtlasUtils {
return guid != null && !guid.startsWith("-");
}
public static String toQualifiedName(String clusterName, String componentId) {
return componentId + "@" + clusterName;
public static String toQualifiedName(String namespace, String componentId) {
return componentId + "@" + namespace;
}
public static String getComponentIdFromQualifiedName(String qualifiedName) {
return qualifiedName.split("@")[0];
}
public static String getClusterNameFromQualifiedName(String qualifiedName) {
public static String getNamespaceFromQualifiedName(String qualifiedName) {
return qualifiedName.split("@")[1];
}

View File

@ -178,13 +178,13 @@ public class NiFiAtlasClient {
/**
* Fetch existing NiFiFlow entity from Atlas.
* @param rootProcessGroupId The id of a NiFi flow root process group.
* @param clusterName The cluster name of a flow.
* @param namespace The namespace of a flow.
* @return A NiFiFlow instance filled with retrieved data from Atlas. Status objects are left blank, e.g. ProcessorStatus.
* @throws AtlasServiceException Thrown if requesting to Atlas API failed, including when the flow is not found.
*/
public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String clusterName) throws AtlasServiceException {
public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String namespace) throws AtlasServiceException {
final String qualifiedName = AtlasUtils.toQualifiedName(clusterName, rootProcessGroupId);
final String qualifiedName = AtlasUtils.toQualifiedName(namespace, rootProcessGroupId);
final AtlasObjectId flowId = new AtlasObjectId(TYPE_NIFI_FLOW, ATTR_QUALIFIED_NAME, qualifiedName);
final AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt = searchEntityDef(flowId);
@ -198,7 +198,7 @@ public class NiFiAtlasClient {
final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
nifiFlow.setExEntity(nifiFlowEntity);
nifiFlow.setFlowName(toStr(attributes.get(ATTR_NAME)));
nifiFlow.setClusterName(clusterName);
nifiFlow.setNamespace(namespace);
nifiFlow.setUrl(toStr(attributes.get(ATTR_URL)));
nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION)));

View File

@ -45,7 +45,6 @@ import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
import static org.apache.nifi.atlas.AtlasUtils.isGuidAssigned;
import static org.apache.nifi.atlas.AtlasUtils.isUpdated;
import static org.apache.nifi.atlas.AtlasUtils.updateMetadata;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
@ -65,7 +64,7 @@ public class NiFiFlow {
private final String rootProcessGroupId;
private String flowName;
private String clusterName;
private String namespace;
private String url;
private String atlasGuid;
private AtlasEntity exEntity;
@ -112,13 +111,13 @@ public class NiFiFlow {
return rootProcessGroupId;
}
public String getClusterName() {
return clusterName;
public String getNamespace() {
return namespace;
}
public void setClusterName(String clusterName) {
updateMetadata(metadataUpdated, updateAudit, ATTR_CLUSTER_NAME, this.clusterName, clusterName);
this.clusterName = clusterName;
public void setNamespace(String namespace) {
updateMetadata(metadataUpdated, updateAudit, "namespace", this.namespace, namespace);
this.namespace = namespace;
atlasObjectId = createAtlasObjectId();
}
@ -370,7 +369,7 @@ public class NiFiFlow {
}
public String toQualifiedName(String componentId) {
return AtlasUtils.toQualifiedName(clusterName, componentId);
return AtlasUtils.toQualifiedName(namespace, componentId);
}
public enum EntityChangeType {

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance;
import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.atlas.resolver.NamespaceResolver;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.lineage.ComputeLineageResult;
@ -24,8 +24,8 @@ import org.apache.nifi.provenance.lineage.ComputeLineageResult;
import java.util.List;
public interface AnalysisContext {
String getNiFiClusterName();
ClusterResolver getClusterResolver();
String getNiFiNamespace();
NamespaceResolver getNamespaceResolver();
List<ConnectionStatus> findConnectionTo(String componentId);
List<ConnectionStatus> findConnectionFrom(String componentId);
ComputeLineageResult queryLineage(long eventId);

View File

@ -17,7 +17,7 @@
package org.apache.nifi.atlas.provenance;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.atlas.resolver.NamespaceResolver;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
@ -34,13 +34,13 @@ public class StandardAnalysisContext implements AnalysisContext {
private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class);
private final NiFiFlow nifiFlow;
private final ClusterResolver clusterResolver;
private final NamespaceResolver namespaceResolver;
private final ProvenanceRepository provenanceRepository;
public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver,
public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver namespaceResolver,
ProvenanceRepository provenanceRepository) {
this.nifiFlow = nifiFlow;
this.clusterResolver = clusterResolver;
this.namespaceResolver = namespaceResolver;
this.provenanceRepository = provenanceRepository;
}
@ -55,13 +55,13 @@ public class StandardAnalysisContext implements AnalysisContext {
}
@Override
public String getNiFiClusterName() {
return nifiFlow.getClusterName();
public String getNiFiNamespace() {
return nifiFlow.getNamespace();
}
@Override
public ClusterResolver getClusterResolver() {
return clusterResolver;
public NamespaceResolver getNamespaceResolver() {
return namespaceResolver;
}
private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) {

View File

@ -32,19 +32,21 @@ public abstract class AbstractHiveAnalyzer extends AbstractNiFiProvenanceEventAn
static final String TYPE_TABLE = "hive_table";
static final String ATTR_DB = "db";
protected Referenceable createDatabaseRef(String clusterName, String databaseName) {
protected Referenceable createDatabaseRef(String namespace, String databaseName) {
final Referenceable ref = new Referenceable(TYPE_DATABASE);
ref.set(ATTR_NAME, databaseName);
ref.set(ATTR_CLUSTER_NAME, clusterName);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, databaseName));
// The attribute 'clusterName' is in the 'hive_db' Atlas entity so it cannot be changed.
// Using 'namespace' as value for lack of better solution.
ref.set(ATTR_CLUSTER_NAME, namespace);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, databaseName));
return ref;
}
protected Referenceable createTableRef(String clusterName, Tuple<String, String> tableName) {
protected Referenceable createTableRef(String namespace, Tuple<String, String> tableName) {
final Referenceable ref = new Referenceable(TYPE_TABLE);
ref.set(ATTR_NAME, tableName.getValue());
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, toTableNameStr(tableName)));
ref.set(ATTR_DB, createDatabaseRef(clusterName, tableName.getKey()));
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, toTableNameStr(tableName)));
ref.set(ATTR_DB, createDatabaseRef(namespace, tableName.getKey()));
return ref;
}

View File

@ -49,12 +49,12 @@ public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
final Referenceable ref = new Referenceable(TYPE);
final URI uri = parseUri(event.getTransitUri());
final String clusterName;
final String namespace;
try {
// use hostname in uri if available for remote path.
final String uriHost = uri.getHost();
final String hostname = StringUtils.isEmpty(uriHost) ? InetAddress.getLocalHost().getHostName() : uriHost;
clusterName = context.getClusterResolver().fromHostNames(hostname);
namespace = context.getNamespaceResolver().fromHostNames(hostname);
} catch (UnknownHostException e) {
logger.warn("Failed to get localhost name due to " + e, e);
return null;
@ -63,7 +63,7 @@ public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
final String path = uri.getPath();
ref.set(ATTR_NAME, path);
ref.set(ATTR_PATH, path);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path));
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
}

View File

@ -34,7 +34,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;
/**
* Analyze a transit URI as a HBase table.
* <li>qualifiedName=tableName@clusterName (example: myTable@cl1)
* <li>qualifiedName=tableName@namespace (example: myTable@ns1)
* <li>name=tableName (example: myTable)
*/
public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {
@ -57,11 +57,11 @@ public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {
final Referenceable ref = new Referenceable(TYPE);
final String[] hostNames = splitHostNames(uriMatcher.group(1));
final String clusterName = context.getClusterResolver().fromHostNames(hostNames);
final String namespace = context.getNamespaceResolver().fromHostNames(hostNames);
final String tableName = uriMatcher.group(2);
ref.set(ATTR_NAME, tableName);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, tableName));
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, tableName));
// TODO: 'uri' is a mandatory attribute, but what should we set?
ref.set(ATTR_URI, transitUri);

View File

@ -32,7 +32,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
/**
* Analyze a transit URI as a HDFS path.
* <li>qualifiedName=/path/fileName@clusterName (example: /app/warehouse/hive/db/default@cl1)
* <li>qualifiedName=/path/fileName@namespace (example: /app/warehouse/hive/db/default@ns1)
* <li>name=/path/fileName (example: /app/warehouse/hive/db/default)
*/
public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
@ -43,12 +43,14 @@ public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
final Referenceable ref = new Referenceable(TYPE);
final URI uri = parseUri(event.getTransitUri());
final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
final String path = uri.getPath();
ref.set(ATTR_NAME, path);
ref.set(ATTR_PATH, path);
ref.set(ATTR_CLUSTER_NAME, clusterName);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, path));
// The attribute 'clusterName' is in the 'hdfs_path' Atlas entity so it cannot be changed.
// Using 'namespace' as value for lack of better solution.
ref.set(ATTR_CLUSTER_NAME, namespace);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
}

View File

@ -39,13 +39,13 @@ import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.par
* <ul>
* <li>If a Provenance event has 'query.input.tables' or 'query.output.tables' attributes then 'hive_table' DataSet reference is created:
* <ul>
* <li>qualifiedName=tableName@clusterName (example: myTable@cl1)
* <li>qualifiedName=tableName@namespace (example: myTable@ns1)
* <li>name=tableName (example: myTable)
* </ul>
* </li>
* <li>If not, 'hive_database' DataSet reference is created from transit URI:
* <ul>
* <li>qualifiedName=dbName@clusterName (example: default@cl1)
* <li>qualifiedName=dbName@namespace (example: default@ns1)
* <li>dbName (example: default)
* </ul>
* </li>
@ -76,7 +76,7 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
return null;
}
final String clusterName = context.getClusterResolver().fromHostNames(splitHostNames(uriMatcher.group(1)));
final String namespace = context.getNamespaceResolver().fromHostNames(splitHostNames(uriMatcher.group(1)));
String connectedDatabaseName = null;
if (uriMatcher.groupCount() > 1) {
// Try to find connected database name from connection parameters.
@ -95,26 +95,26 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
if (inputTables.isEmpty() && outputTables.isEmpty()) {
// If input/output tables are unknown, create database level lineage.
// Handle case insensitivity of database and table names in Hive: send names uniformly in lower case
return getDatabaseRef(event.getComponentId(), event.getEventType(), clusterName, connectedDatabaseName.toLowerCase());
return getDatabaseRef(event.getComponentId(), event.getEventType(), namespace, connectedDatabaseName.toLowerCase());
}
final DataSetRefs refs = new DataSetRefs(event.getComponentId());
addRefs(refs, true, clusterName, inputTables);
addRefs(refs, false, clusterName, outputTables);
addRefs(refs, true, namespace, inputTables);
addRefs(refs, false, namespace, outputTables);
return refs;
}
private DataSetRefs getDatabaseRef(String componentId, ProvenanceEventType eventType,
String clusterName, String databaseName) {
final Referenceable ref = createDatabaseRef(clusterName, databaseName);
String namespace, String databaseName) {
final Referenceable ref = createDatabaseRef(namespace, databaseName);
return singleDataSetRef(componentId, eventType, ref);
}
private void addRefs(DataSetRefs refs, boolean isInput, String clusterName,
private void addRefs(DataSetRefs refs, boolean isInput, String namespace,
Set<Tuple<String, String>> tableNames) {
tableNames.forEach(tableName -> {
final Referenceable ref = createTableRef(clusterName, tableName);
final Referenceable ref = createTableRef(namespace, tableName);
if (isInput) {
refs.addInput(ref);
} else {

View File

@ -34,7 +34,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;
/**
* Analyze a transit URI as a Kafka topic.
* <li>qualifiedName=topicName@clusterName (example: testTopic@cl1)
* <li>qualifiedName=topicName@namespace (example: testTopic@ns1)
* <li>name=topicName (example: testTopic)
*/
public class KafkaTopic extends AbstractNiFiProvenanceEventAnalyzer {
@ -63,13 +63,13 @@ public class KafkaTopic extends AbstractNiFiProvenanceEventAnalyzer {
}
final String[] hostNames = splitHostNames(uriMatcher.group(1));
final String clusterName = context.getClusterResolver().fromHostNames(hostNames);
final String namespace = context.getNamespaceResolver().fromHostNames(hostNames);
final String topicName = uriMatcher.group(2);
ref.set(ATTR_NAME, topicName);
ref.set(ATTR_TOPIC, topicName);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(clusterName, topicName));
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, topicName));
ref.set(ATTR_URI, transitUri);
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);

View File

@ -36,7 +36,7 @@ import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
/**
* Analyze a transit URI as a NiFi Site-to-Site remote input/output port.
* <li>qualifiedName=remotePortGUID@clusterName (example: 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)
* <li>qualifiedName=remotePortGUID@namespace (example: 35dbc0ab-015e-1000-144c-a8d71255027d@ns1)
* <li>name=portName (example: input)
*/
public class NiFiRemotePort extends NiFiS2S {
@ -54,7 +54,7 @@ public class NiFiRemotePort extends NiFiS2S {
final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port");
final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
final S2SPort s2SPort = analyzeS2SPort(event, context.getNamespaceResolver());
// Find connections that connects to/from the remote port.
final String componentId = event.getComponentId();
@ -70,7 +70,7 @@ public class NiFiRemotePort extends NiFiS2S {
final ConnectionStatus connection = connections.get(0);
final Referenceable ref = new Referenceable(type);
ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName());
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, s2SPort.targetPortId));
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.namespace, s2SPort.targetPortId));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
}

View File

@ -54,7 +54,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
final String rootPortId = event.getComponentId();
final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
final S2SPort s2SPort = analyzeS2SPort(event, context.getNamespaceResolver());
// Find connections connecting to/from the remote port.
final List<ConnectionStatus> connections = isInputPort
@ -69,7 +69,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
final ConnectionStatus connection = connections.get(0);
final Referenceable ref = new Referenceable(type);
ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName());
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, rootPortId));
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.namespace, rootPortId));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
}

View File

@ -17,7 +17,7 @@
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.atlas.resolver.NamespaceResolver;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,7 +32,7 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
private static final Pattern RAW_URL_REGEX = Pattern.compile("nifi://([^:/]+):\\d+/([0-9a-zA-Z\\-]+)");
private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*");
protected S2SPort analyzeS2SPort(ProvenanceEventRecord event, ClusterResolver clusterResolver) {
protected S2SPort analyzeS2SPort(ProvenanceEventRecord event, NamespaceResolver namespaceResolver) {
final String transitUri = event.getTransitUri();
final int protocolIndex = transitUri.indexOf(':');
final String protocol = transitUri.substring(0, protocolIndex).toLowerCase();
@ -61,8 +61,8 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
}
final String clusterName = clusterResolver.fromHostNames(targetHostname);
return new S2SPort(clusterName, targetPortId);
final String namespace = namespaceResolver.fromHostNames(targetHostname);
return new S2SPort(namespace, targetPortId);
}
abstract protected String getRawProtocolPortId(ProvenanceEventRecord event);
@ -76,11 +76,11 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
}
protected static class S2SPort {
final String clusterName;
final String namespace;
final String targetPortId;
public S2SPort(String clusterName, String targetPortId) {
this.clusterName = clusterName;
public S2SPort(String namespace, String targetPortId) {
this.namespace = namespace;
this.targetPortId = targetPortId;
}
}

View File

@ -30,7 +30,7 @@ import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.par
/**
* Analyze provenance events for PutHiveStreamingProcessor.
* <li>qualifiedName=tableName@clusterName (example: myTable@cl1)
* <li>qualifiedName=tableName@namespace (example: myTable@ns1)
* <li>name=tableName (example: myTable)
*/
public class PutHiveStreaming extends AbstractHiveAnalyzer {
@ -42,7 +42,7 @@ public class PutHiveStreaming extends AbstractHiveAnalyzer {
}
final URI uri = parseUri(event.getTransitUri());
final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
final Set<Tuple<String, String>> outputTables = parseTableNames(null, event.getAttribute(ATTR_OUTPUT_TABLES));
if (outputTables.isEmpty()) {
return null;
@ -50,7 +50,7 @@ public class PutHiveStreaming extends AbstractHiveAnalyzer {
final DataSetRefs refs = new DataSetRefs(event.getComponentId());
outputTables.forEach(tableName -> {
final Referenceable ref = createTableRef(clusterName, tableName);
final Referenceable ref = createTableRef(namespace, tableName);
refs.addOutput(ref);
});
return refs;

View File

@ -26,7 +26,7 @@ import java.util.List;
/**
* Analyze a CREATE event and create 'nifi_data' when there is no specific Analyzer implementation found.
* <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
* <li>qualifiedName=NiFiComponentId@namespace (example: processor GUID@ns1)
* <li>name=NiFiComponentType (example: GenerateFlowFile)
*/
public class Create extends UnknownInput {

View File

@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a FETCH event and create 'nifi_data' when there is no specific Analyzer implementation found.
* <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
* <li>qualifiedName=NiFiComponentId@namespace (example: processor GUID@ns1)
* <li>name=NiFiComponentType (example: FetchXXX)
*/
public class Fetch extends UnknownInput {

View File

@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a RECEIVE event and create 'nifi_data' when there is no specific Analyzer implementation found.
* <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
* <li>qualifiedName=NiFiComponentId@namespace (example: processor GUID@ns1)
* <li>name=NiFiComponentType (example: GetXXX)
*/
public class Receive extends UnknownInput {

View File

@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a REMOTE_INVOCATION event and create 'nifi_data' when there is no specific Analyzer implementation found.
* <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
* <li>qualifiedName=NiFiComponentId@namespace (example: processor GUID@ns1)
* <li>name=NiFiComponentType (example: XXX)
*/
public class RemoteInvocation extends UnknownOutput {

View File

@ -20,7 +20,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
/**
* Analyze a SEND event and create 'nifi_data' when there is no specific Analyzer implementation found.
* <li>qualifiedName=NiFiComponentId@clusterName (example: processor GUID@cl1)
* <li>qualifiedName=NiFiComponentId@namespace (example: processor GUID@ns1)
* <li>name=NiFiComponentType (example: PutXXX)
*/
public class Send extends UnknownOutput {

View File

@ -33,7 +33,7 @@ public abstract class UnknownDataSet extends AbstractNiFiProvenanceEventAnalyzer
protected Referenceable createDataSetRef(AnalysisContext context, ProvenanceEventRecord event) {
final Referenceable ref = new Referenceable(TYPE);
ref.set(ATTR_NAME, event.getComponentType());
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(context.getNiFiClusterName(), event.getComponentId()));
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(context.getNiFiNamespace(), event.getComponentId()));
ref.set(ATTR_DESCRIPTION, event.getEventType() + " was performed by " + event.getComponentType());
return ref;
}

View File

@ -90,11 +90,11 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
protected void addDataSetRefs(NiFiFlow nifiFlow, Set<NiFiFlowPath> flowPaths, DataSetRefs refs) {
// create reference to NiFi flow path.
final Referenceable flowRef = toReferenceable(nifiFlow);
final String clusterName = nifiFlow.getClusterName();
final String namespace = nifiFlow.getNamespace();
final String url = nifiFlow.getUrl();
for (NiFiFlowPath flowPath : flowPaths) {
final Referenceable flowPathRef = toReferenceable(flowPath, flowRef, clusterName, url);
final Referenceable flowPathRef = toReferenceable(flowPath, flowRef, namespace, url);
addDataSetRefs(refs, flowPathRef);
}
}
@ -109,13 +109,13 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
protected Referenceable toReferenceable(NiFiFlowPath flowPath, NiFiFlow nifiFlow) {
return toReferenceable(flowPath, toReferenceable(nifiFlow),
nifiFlow.getClusterName(), nifiFlow.getUrl());
nifiFlow.getNamespace(), nifiFlow.getUrl());
}
private Referenceable toReferenceable(NiFiFlowPath flowPath, Referenceable flowRef, String clusterName, String nifiUrl) {
private Referenceable toReferenceable(NiFiFlowPath flowPath, Referenceable flowRef, String namespace, String nifiUrl) {
final Referenceable flowPathRef = new Referenceable(TYPE_NIFI_FLOW_PATH);
flowPathRef.set(ATTR_NAME, flowPath.getName());
flowPathRef.set(ATTR_QUALIFIED_NAME, flowPath.getId() + "@" + clusterName);
flowPathRef.set(ATTR_QUALIFIED_NAME, flowPath.getId() + "@" + namespace);
flowPathRef.set(ATTR_NIFI_FLOW, flowRef);
flowPathRef.set(ATTR_URL, flowPath.createDeepLinkURL(nifiUrl));
// Referenceable has to have GUID assigned, otherwise it will not be stored due to lack of required attribute.

View File

@ -256,7 +256,7 @@ public class CompleteFlowPathLineage extends AbstractLineageStrategy {
// In order to differentiate a queue between parents and this flow_path, add the hash into the queue qname.
// E.g, FF1 and FF2 read from dirA were merged, vs FF3 and FF4 read from dirB were merged then passed here, these two should be different queue.
if (queueBetweenParent != null) {
queueBetweenParent.set(ATTR_QUALIFIED_NAME, toQualifiedName(nifiFlow.getClusterName(), firstComponentId + "::" + hash));
queueBetweenParent.set(ATTR_QUALIFIED_NAME, toQualifiedName(nifiFlow.getNamespace(), firstComponentId + "::" + hash));
}
// If the same components emitted multiple provenance events consecutively, merge it to come up with a simpler name.

View File

@ -40,9 +40,9 @@ import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.RegexClusterResolver;
import org.apache.nifi.atlas.resolver.NamespaceResolver;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.atlas.resolver.RegexNamespaceResolver;
import org.apache.nifi.atlas.security.AtlasAuthN;
import org.apache.nifi.atlas.security.Basic;
import org.apache.nifi.atlas.security.Kerberos;
@ -66,6 +66,7 @@ import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringSelector;
import java.io.File;
import java.io.FileInputStream;
@ -89,6 +90,7 @@ import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@ -103,20 +105,22 @@ import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.
" in addition to NiFi provenance events providing detailed event level lineage." +
" See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.<ClusterName>", value = "hostname Regex patterns",
description = RegexClusterResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@DynamicProperty(name = "hostnamePattern.<namespace>", value = "hostname Regex patterns",
description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class ReportLineageToAtlas extends AbstractReportingTask {
private static final String ATLAS_URL_DELIMITER = ",";
static final PropertyDescriptor ATLAS_URLS = new PropertyDescriptor.Builder()
.name("atlas-urls")
.displayName("Atlas URLs")
.description("Comma separated URL of Atlas Servers" +
" (e.g. http://atlas-server-hostname:21000 or https://atlas-server-hostname:21443)." +
" For accessing Atlas behind Knox gateway, specify Knox gateway URL" +
" (e.g. https://knox-hostname:8443/gateway/{topology-name}/atlas).")
.required(true)
" (e.g. https://knox-hostname:8443/gateway/{topology-name}/atlas)." +
" If not specified, 'atlas.rest.address' in Atlas Configuration File is used.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -192,11 +196,11 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
public static final PropertyDescriptor ATLAS_DEFAULT_CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("atlas-default-cluster-name")
.displayName("Atlas Default Cluster Name")
.description("Cluster name for Atlas entities reported by this ReportingTask." +
" If not specified, 'atlas.cluster.name' in Atlas Configuration File is used." +
" Cluster name mappings can be configured by user defined properties." +
" See additional detail for detail.")
.displayName("Atlas Default Metadata Namespace")
.description("Namespace for Atlas entities reported by this ReportingTask." +
" If not specified, 'atlas.metadata.namespace' or 'atlas.cluster.name' (the former having priority) in Atlas Configuration File is used." +
" Multiple mappings can be configured by user defined properties." +
" See 'Additional Details...' for more.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@ -214,10 +218,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.defaultValue("false")
.build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
static final PropertyDescriptor KAFKA_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("Specifies the SSL Context Service to use for communicating with Atlas and Kafka.")
.displayName("Kafka SSL Context Service")
.description("Specifies the SSL Context Service to use for communicating with Kafka.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
@ -249,9 +253,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.defaultValue(SEC_PLAINTEXT.getValue())
.build();
public static final PropertyDescriptor NIFI_KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("nifi-kerberos-principal")
.displayName("NiFi Kerberos Principal")
.displayName("Kerberos Principal")
.description("The Kerberos principal for this NiFi instance to access Atlas API and Kafka brokers." +
" If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file." +
" This principal will be set into 'sasl.jaas.config' Kafka's property.")
@ -259,9 +263,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor NIFI_KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
.name("nifi-kerberos-keytab")
.displayName("NiFi Kerberos Keytab")
.displayName("Kerberos Keytab")
.description("The Kerberos keytab for this NiFi instance to access Atlas API and Kafka brokers." +
" If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file." +
" This principal will be set into 'sasl.jaas.config' Kafka's property.")
@ -298,9 +302,9 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
"Create separate 'nifi_flow_path' Atlas Processes for each distinct input and output DataSet combinations" +
" by looking at the complete route for a given FlowFile. See also 'Additional Details.");
static final PropertyDescriptor NIFI_LINEAGE_STRATEGY = new PropertyDescriptor.Builder()
static final PropertyDescriptor LINEAGE_STRATEGY = new PropertyDescriptor.Builder()
.name("nifi-lineage-strategy")
.displayName("NiFi Lineage Strategy")
.displayName("Lineage Strategy")
.description("Specifies granularity on how NiFi data flow should be reported to Atlas." +
" NOTE: It is strongly recommended to keep using the same strategy once this reporting task started to keep Atlas data clean." +
" Switching strategies will not delete Atlas entities created by the old strategy." +
@ -314,55 +318,61 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs";
private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs";
private static final String ATLAS_PROPERTY_METADATA_NAMESPACE = "atlas.metadata.namespace";
private static final String ATLAS_PROPERTY_CLUSTER_NAME = "atlas.cluster.name";
private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers";
private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;
private final ServiceLoader<ClusterResolver> clusterResolverLoader = ServiceLoader.load(ClusterResolver.class);
private final ServiceLoader<NamespaceResolver> namespaceResolverLoader = ServiceLoader.load(NamespaceResolver.class);
private volatile AtlasAuthN atlasAuthN;
private volatile Properties atlasProperties;
private volatile boolean isTypeDefCreated = false;
private volatile String defaultClusterName;
private volatile String defaultMetadataNamespace;
private volatile ProvenanceEventConsumer consumer;
private volatile ClusterResolvers clusterResolvers;
private volatile NamespaceResolvers namespaceResolvers;
private volatile NiFiAtlasHook nifiAtlasHook;
private volatile LineageStrategy lineageStrategy;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
// Basic atlas config
properties.add(ATLAS_URLS);
properties.add(ATLAS_CONNECT_TIMEOUT);
properties.add(ATLAS_READ_TIMEOUT);
properties.add(ATLAS_CONF_DIR);
properties.add(ATLAS_CONF_CREATE);
properties.add(ATLAS_DEFAULT_CLUSTER_NAME);
// General config used by the processor
properties.add(LINEAGE_STRATEGY);
properties.add(PROVENANCE_START_POSITION);
properties.add(PROVENANCE_BATCH_SIZE);
properties.add(ATLAS_NIFI_URL);
properties.add(ATLAS_AUTHN_METHOD);
properties.add(ATLAS_USER);
properties.add(ATLAS_PASSWORD);
properties.add(ATLAS_CONF_DIR);
properties.add(ATLAS_NIFI_URL);
properties.add(ATLAS_DEFAULT_CLUSTER_NAME);
properties.add(NIFI_LINEAGE_STRATEGY);
properties.add(PROVENANCE_START_POSITION);
properties.add(PROVENANCE_BATCH_SIZE);
properties.add(SSL_CONTEXT_SERVICE);
// Following properties are required if ATLAS_CONF_CREATE is enabled.
// Otherwise should be left blank.
properties.add(ATLAS_CONF_CREATE);
// Will be used by the atlas client by reading the values from the atlas config file
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(NIFI_KERBEROS_PRINCIPAL);
properties.add(NIFI_KERBEROS_KEYTAB);
properties.add(KAFKA_KERBEROS_SERVICE_NAME);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(KAFKA_BOOTSTRAP_SERVERS);
properties.add(KAFKA_SECURITY_PROTOCOL);
properties.add(KAFKA_KERBEROS_SERVICE_NAME);
properties.add(KAFKA_SSL_CONTEXT_SERVICE);
properties.add(ATLAS_CONNECT_TIMEOUT);
properties.add(ATLAS_READ_TIMEOUT);
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
for (ClusterResolver resolver : clusterResolverLoader) {
for (NamespaceResolver resolver : namespaceResolverLoader) {
final PropertyDescriptor propertyDescriptor = resolver.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
if(propertyDescriptor != null) {
return propertyDescriptor;
@ -371,42 +381,35 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
return null;
}
private void parseAtlasUrls(final PropertyValue atlasUrlsProp, final Consumer<String> urlStrConsumer) {
final String atlasUrlsStr = atlasUrlsProp.evaluateAttributeExpressions().getValue();
if (atlasUrlsStr != null && !atlasUrlsStr.isEmpty()) {
Arrays.stream(atlasUrlsStr.split(","))
.map(String::trim)
.forEach(urlStrConsumer);
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final boolean isSSLContextServiceSet = context.getProperty(SSL_CONTEXT_SERVICE).isSet();
final boolean isSSLContextServiceSet = context.getProperty(KAFKA_SSL_CONTEXT_SERVICE).isSet();
final ValidationResult.Builder invalidSSLService = new ValidationResult.Builder()
.subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false);
parseAtlasUrls(context.getProperty(ATLAS_URLS), input -> {
final ValidationResult.Builder builder = new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input);
try {
final URL url = new URL(input);
if ("https".equalsIgnoreCase(url.getProtocol()) && !isSSLContextServiceSet) {
results.add(invalidSSLService.explanation("required by HTTPS Atlas access").build());
} else {
results.add(builder.explanation("Valid URI").valid(true).build());
}
} catch (Exception e) {
results.add(builder.explanation("Contains invalid URI: " + e).valid(false).build());
}
});
.subject(KAFKA_SSL_CONTEXT_SERVICE.getDisplayName()).valid(false);
String atlasUrls = context.getProperty(ATLAS_URLS).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(atlasUrls)) {
Arrays.stream(atlasUrls.split(ATLAS_URL_DELIMITER))
.map(String::trim)
.forEach(input -> {
final ValidationResult.Builder builder = new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input);
try {
new URL(input);
results.add(builder.explanation("Valid URI").valid(true).build());
} catch (Exception e) {
results.add(builder.explanation("Contains invalid URI: " + e).valid(false).build());
}
});
}
final String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
results.addAll(atlasAuthN.validate(context));
clusterResolverLoader.forEach(resolver -> results.addAll(resolver.validate(context)));
namespaceResolverLoader.forEach(resolver -> results.addAll(resolver.validate(context)));
if (context.getProperty(ATLAS_CONF_CREATE).asBoolean()) {
@ -430,8 +433,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
results.add(invalidSSLService.explanation("required by SSL Kafka connection").build());
}
final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
@ -469,13 +472,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// initAtlasClient has to be done first as it loads AtlasProperty.
initAtlasProperties(context);
initLineageStrategy(context);
initClusterResolvers(context);
initNamespaceResolvers(context);
}
private void initLineageStrategy(ConfigurationContext context) throws IOException {
nifiAtlasHook = new NiFiAtlasHook();
final String strategy = context.getProperty(NIFI_LINEAGE_STRATEGY).getValue();
final String strategy = context.getProperty(LINEAGE_STRATEGY).getValue();
if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) {
lineageStrategy = new SimpleFlowPathLineage();
} else if (LINEAGE_STRATEGY_COMPLETE_PATH.equals(strategy)) {
@ -486,20 +489,17 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
initProvenanceConsumer(context);
}
private void initClusterResolvers(ConfigurationContext context) {
final Set<ClusterResolver> loadedClusterResolvers = new LinkedHashSet<>();
clusterResolverLoader.forEach(resolver -> {
private void initNamespaceResolvers(ConfigurationContext context) {
final Set<NamespaceResolver> loadedNamespaceResolvers = new LinkedHashSet<>();
namespaceResolverLoader.forEach(resolver -> {
resolver.configure(context);
loadedClusterResolvers.add(resolver);
loadedNamespaceResolvers.add(resolver);
});
clusterResolvers = new ClusterResolvers(Collections.unmodifiableSet(loadedClusterResolvers), defaultClusterName);
namespaceResolvers = new NamespaceResolvers(Collections.unmodifiableSet(loadedNamespaceResolvers), defaultMetadataNamespace);
}
private void initAtlasProperties(ConfigurationContext context) throws IOException {
List<String> urls = new ArrayList<>();
parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
final String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
final String confDirStr = context.getProperty(ATLAS_CONF_DIR).evaluateAttributeExpressions().getValue();
@ -532,17 +532,18 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
}
}
// Resolve default cluster name.
defaultClusterName = context.getProperty(ATLAS_DEFAULT_CLUSTER_NAME).evaluateAttributeExpressions().getValue();
if (defaultClusterName == null || defaultClusterName.isEmpty()) {
// If default cluster name is not specified by processor configuration, then load it from Atlas config.
defaultClusterName = atlasProperties.getProperty(ATLAS_PROPERTY_CLUSTER_NAME);
}
List<String> urls = parseAtlasUrls(context.getProperty(ATLAS_URLS));
final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
// If default cluster name is still not defined, processor should not be able to start.
if (defaultClusterName == null || defaultClusterName.isEmpty()) {
throw new ProcessException("Default cluster name is not defined.");
}
setValue(
value -> defaultMetadataNamespace = value,
() -> {
throw new ProcessException("Default metadata namespace (or cluster name) is not defined.");
},
context.getProperty(ATLAS_DEFAULT_CLUSTER_NAME),
atlasProperties.getProperty(ATLAS_PROPERTY_METADATA_NAMESPACE),
atlasProperties.getProperty(ATLAS_PROPERTY_CLUSTER_NAME)
);
String atlasConnectTimeoutMs = context.getProperty(ATLAS_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue() + "";
String atlasReadTimeoutMs = context.getProperty(ATLAS_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue() + "";
@ -555,9 +556,11 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// enforce synchronous notification sending (needed for the checkpointing in ProvenanceEventConsumer)
atlasProperties.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
atlasProperties.put(ATLAS_PROPERTY_REST_ADDRESS, urls.stream().collect(Collectors.joining(ATLAS_URL_DELIMITER)));
atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS, atlasConnectTimeoutMs);
atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, atlasReadTimeoutMs);
atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, defaultClusterName);
atlasProperties.put(ATLAS_PROPERTY_METADATA_NAMESPACE, defaultMetadataNamespace);
atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, defaultMetadataNamespace);
atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, String.valueOf(isAtlasApiSecure));
setKafkaConfig(atlasProperties, context);
@ -585,19 +588,63 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
if (confDir != null) {
// If atlasConfDir is not set, atlas-application.properties will be searched under classpath.
Properties props = System.getProperties();
final String atlasConfProp = "atlas.conf";
final String atlasConfProp = ApplicationProperties.ATLAS_CONFIGURATION_DIRECTORY_PROPERTY;
props.setProperty(atlasConfProp, confDir.getAbsolutePath());
getLogger().debug("{} has been set to: {}", new Object[]{atlasConfProp, props.getProperty(atlasConfProp)});
}
}
private List<String> parseAtlasUrls(final PropertyValue atlasUrlsProp) {
List<String> atlasUrls = new ArrayList<>();
setValue(
value -> Arrays.stream(value.split(ATLAS_URL_DELIMITER))
.map(String::trim)
.forEach(urlString -> {
try {
new URL(urlString);
} catch (Exception e) {
throw new ProcessException(e);
}
atlasUrls.add(urlString);
}
),
() -> {
throw new ProcessException("No Atlas URL has been specified! Set either the '" + ATLAS_URLS.getDisplayName() + "' " +
"property on the processor or the 'atlas.rest.address' property in the atlas configuration file.");
},
atlasUrlsProp,
atlasProperties.getProperty(ATLAS_PROPERTY_REST_ADDRESS)
);
return atlasUrls;
}
private void setValue(Consumer<String> setter, Runnable emptyHandler, PropertyValue elEnabledPropertyValue, String... properties) {
StringSelector valueSelector = StringSelector
.of(elEnabledPropertyValue.evaluateAttributeExpressions().getValue())
.or(properties);
if (valueSelector.found()) {
setter.accept(valueSelector.toString());
} else {
emptyHandler.run();
}
}
private void checkAtlasUrls(List<String> urlStrings, ConfigurationContext context) {
if (urlStrings.isEmpty()) {
throw new ProcessException("No Atlas URL has been specified! Set either the '" + ATLAS_URLS.getDisplayName() + "' " +
"property on the processor or the 'atlas.rest.address' porperty in the atlas configuration file.");
}
}
/**
* In order to avoid authentication expiration issues (i.e. Kerberos ticket and DelegationToken expiration),
* create Atlas client instance at every onTrigger execution.
*/
protected NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
List<String> urls = new ArrayList<>();
parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
List<String> urls = parseAtlasUrls(context.getProperty(ATLAS_URLS));
try {
return new NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{})));
} catch (final NullPointerException e) {
@ -709,10 +756,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
final String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();
final String clusterName;
final String namespace;
try {
final String nifiHostName = new URL(nifiUrl).getHost();
clusterName = clusterResolvers.fromHostNames(nifiHostName);
namespace = namespaceResolvers.fromHostNames(nifiHostName);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Failed to parse NiFi URL, " + e.getMessage(), e);
}
@ -720,10 +767,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
NiFiFlow existingNiFiFlow = null;
try {
// Retrieve Existing NiFiFlow from Atlas.
existingNiFiFlow = atlasClient.fetchNiFiFlow(rootProcessGroup.getId(), clusterName);
existingNiFiFlow = atlasClient.fetchNiFiFlow(rootProcessGroup.getId(), namespace);
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())){
getLogger().debug("Existing flow was not found for {}@{}", new Object[]{rootProcessGroup.getId(), clusterName});
getLogger().debug("Existing flow was not found for {}@{}", new Object[]{rootProcessGroup.getId(), namespace});
} else {
throw new RuntimeException("Failed to fetch existing NiFI flow. " + e, e);
}
@ -732,7 +779,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
final NiFiFlow nifiFlow = existingNiFiFlow != null ? existingNiFiFlow : new NiFiFlow(rootProcessGroup.getId());
nifiFlow.setFlowName(flowName);
nifiFlow.setUrl(nifiUrl);
nifiFlow.setClusterName(clusterName);
nifiFlow.setNamespace(namespace);
final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
@ -744,7 +791,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) {
final EventAccess eventAccess = context.getEventAccess();
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers,
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, namespaceResolvers,
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
(ProvenanceRepository)eventAccess.getProvenanceRepository());
consumer.consumeEvents(context, (componentMapHolder, events) -> {
@ -770,7 +817,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
mapToPopulate.put(ATLAS_KAFKA_PREFIX + "security.protocol", kafkaSecurityProtocol);
// Translate SSLContext Service configuration into Kafka properties
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContextService sslContextService = context.getProperty(KAFKA_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
mapToPopulate.put(ATLAS_KAFKA_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
mapToPopulate.put(ATLAS_KAFKA_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
@ -802,8 +849,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private void setKafkaJaasConfig(Map<Object, Object> mapToPopulate, PropertyContext context) {
String keytab;
String principal;
final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);

View File

@ -25,7 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
public interface ClusterResolver {
public interface NamespaceResolver {
default Collection<ValidationResult> validate(final ValidationContext validationContext) {
return Collections.emptySet();
@ -40,18 +40,18 @@ public interface ClusterResolver {
void configure(PropertyContext context);
/**
* Resolve a cluster name from a list of host names or an ip addresses.
* @param hostNames hostname or ip address
* @return resolved cluster name or null
* Resolve a namespace from a list of host names or IP addresses.
* @param hostNames host names or IP addresses
* @return resolved namespace or null
*/
default String fromHostNames(String ... hostNames) {
return null;
}
/**
* Resolve a cluster name from hints, such as Zookeeper Quorum, client port and znode path
* @param hints Contains variables to resolve a cluster name
* @return resolved cluster name or null
* Resolve a namespace from hints, such as Zookeeper Quorum, client port and znode path
* @param hints Contains variables to resolve a namespace
* @return resolved namespace or null
*/
default String fromHints(Map<String, String> hints) {
return null;

View File

@ -26,19 +26,19 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
public class ClusterResolvers implements ClusterResolver {
private final Set<ClusterResolver> resolvers;
public class NamespaceResolvers implements NamespaceResolver {
private final Set<NamespaceResolver> resolvers;
private final String defaultClusterName;
private final String defaultNamespace;
public ClusterResolvers(Set<ClusterResolver> resolvers, String defaultClusterName) {
public NamespaceResolvers(Set<NamespaceResolver> resolvers, String defaultNamespace) {
this.resolvers = resolvers;
this.defaultClusterName = defaultClusterName;
this.defaultNamespace = defaultNamespace;
}
@Override
public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
for (ClusterResolver resolver : resolvers) {
for (NamespaceResolver resolver : resolvers) {
final PropertyDescriptor descriptor = resolver.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
if (descriptor != null) {
return descriptor;
@ -50,7 +50,7 @@ public class ClusterResolvers implements ClusterResolver {
@Override
public Collection<ValidationResult> validate(ValidationContext validationContext) {
Collection<ValidationResult> results = new ArrayList<>();
for (ClusterResolver resolver : resolvers) {
for (NamespaceResolver resolver : resolvers) {
results.addAll(resolver.validate(validationContext));
}
return results;
@ -58,30 +58,30 @@ public class ClusterResolvers implements ClusterResolver {
@Override
public void configure(PropertyContext context) {
for (ClusterResolver resolver : resolvers) {
for (NamespaceResolver resolver : resolvers) {
resolver.configure(context);
}
}
@Override
public String fromHostNames(String ... hostNames) {
for (ClusterResolver resolver : resolvers) {
final String clusterName = resolver.fromHostNames(hostNames);
if (clusterName != null && !clusterName.isEmpty()) {
return clusterName;
for (NamespaceResolver resolver : resolvers) {
final String namespace = resolver.fromHostNames(hostNames);
if (namespace != null && !namespace.isEmpty()) {
return namespace;
}
}
return defaultClusterName;
return defaultNamespace;
}
@Override
public String fromHints(Map<String, String> hints) {
for (ClusterResolver resolver : resolvers) {
final String clusterName = resolver.fromHints(hints);
if (clusterName != null && !clusterName.isEmpty()) {
return clusterName;
for (NamespaceResolver resolver : resolvers) {
final String namespace = resolver.fromHints(hints);
if (namespace != null && !namespace.isEmpty()) {
return namespace;
}
}
return defaultClusterName;
return defaultNamespace;
}
}

View File

@ -34,12 +34,12 @@ import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class RegexClusterResolver implements ClusterResolver {
public class RegexNamespaceResolver implements NamespaceResolver {
public static final String PATTERN_PROPERTY_PREFIX = "hostnamePattern.";
public static final String PATTERN_PROPERTY_PREFIX_DESC = "White space delimited (including new line) Regular Expressions" +
" to resolve a 'Cluster Name' from a hostname or IP address of a transit URI of NiFi provenance record.";
private Map<String, Set<Pattern>> clusterNamePatterns;
" to resolve a namespace from a hostname or IP address of a transit URI of NiFi provenance record.";
private Map<String, Set<Pattern>> namespacePatterns;
@Override
public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
@ -60,7 +60,7 @@ public class RegexClusterResolver implements ClusterResolver {
public Collection<ValidationResult> validate(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>();
consumeConfigurations(validationContext.getAllProperties(),
(clusterNamePatterns, patterns) -> {},
(namespacePatterns, patterns) -> {},
(entry, e) -> {
final ValidationResult result = new ValidationResult.Builder()
.subject(entry.getKey())
@ -76,9 +76,9 @@ public class RegexClusterResolver implements ClusterResolver {
@Override
public void configure(PropertyContext context) {
clusterNamePatterns = new HashMap<>();
namespacePatterns = new HashMap<>();
consumeConfigurations(context.getAllProperties(),
(clusterName, patterns) -> clusterNamePatterns.put(clusterName, patterns),
(namespace, patterns) -> namespacePatterns.put(namespace, patterns),
null);
}
@ -89,15 +89,15 @@ public class RegexClusterResolver implements ClusterResolver {
allProperties.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(PATTERN_PROPERTY_PREFIX))
.forEach(entry -> {
final String clusterName;
final String namespace;
final Set<Pattern> patterns;
try {
clusterName = entry.getKey().substring(PATTERN_PROPERTY_PREFIX.length());
namespace = entry.getKey().substring(PATTERN_PROPERTY_PREFIX.length());
final String[] regexsArray = entry.getValue().split("\\s");
final List<String> regexs = Arrays.stream(regexsArray)
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
patterns = parseClusterNamePatterns(clusterName, regexs);
consumer.accept(clusterName, patterns);
patterns = parseNamespacePatterns(namespace, regexs);
consumer.accept(namespace, patterns);
} catch (RuntimeException e) {
if (errorHandler != null) {
errorHandler.accept(entry, e);
@ -108,14 +108,14 @@ public class RegexClusterResolver implements ClusterResolver {
});
}
private Set<Pattern> parseClusterNamePatterns(final String clusterName, List<String> regexs) {
if (clusterName == null || clusterName.isEmpty()) {
throw new IllegalArgumentException("Empty cluster name is not allowed.");
private Set<Pattern> parseNamespacePatterns(final String namespace, List<String> regexs) {
if (namespace == null || namespace.isEmpty()) {
throw new IllegalArgumentException("Empty namespace is not allowed.");
}
if (regexs.size() == 0) {
throw new IllegalArgumentException(
String.format("At least one cluster name pattern is required, [%s].", clusterName));
String.format("At least one namespace pattern is required, [%s].", namespace));
}
return regexs.stream().map(Pattern::compile).collect(Collectors.toSet());
@ -123,7 +123,7 @@ public class RegexClusterResolver implements ClusterResolver {
@Override
public String fromHostNames(String ... hostNames) {
for (Map.Entry<String, Set<Pattern>> entry : clusterNamePatterns.entrySet()) {
for (Map.Entry<String, Set<Pattern>> entry : namespacePatterns.entrySet()) {
for (Pattern pattern : entry.getValue()) {
for (String hostname : hostNames) {
if (pattern.matcher(hostname).matches()) {

View File

@ -31,8 +31,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KERBEROS_KEYTAB;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KERBEROS_PRINCIPAL;
public class Kerberos implements AtlasAuthN {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
@ -44,8 +44,8 @@ public class Kerberos implements AtlasAuthN {
public Collection<ValidationResult> validate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>();
final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
@ -95,8 +95,8 @@ public class Kerberos implements AtlasAuthN {
@Override
public void configure(PropertyContext context) {
final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);

View File

@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.atlas.resolver.RegexClusterResolver
org.apache.nifi.atlas.resolver.RegexNamespaceResolver

View File

@ -27,7 +27,7 @@
<ul>
<li><a href="#how-it-works">Information reported to Atlas</a></li>
<li><a href="#nifi-atlas-types">NiFi Atlas Types</a></li>
<li><a href="#cluster-name">Cluster Name Resolution</a></li>
<li><a href="#namespaces">Namespaces (formerly Cluster Name Resolution)</a></li>
<li><a href="#nifi-flow-structure">NiFi flow structure</a>
<ul>
<li><a href="#path-separation">Path Separation Logic</a></li>
@ -58,7 +58,7 @@
<p>Technically each information is sent using different protocol, Atlas REST API v2, and Notification via a Kafka topic as shown in above image.</p>
<p>As both information types use the same <a href="#nifi-atlas-types">NiFi Atlas Types</a> and <a href="#cluster-name">Cluster Name Resolution</a> concepts, it is recommended to start reading those sections first.</p>
<p>As both information types use the same <a href="#nifi-atlas-types">NiFi Atlas Types</a> and <a href="#namespaces">Namespaces</a> concepts, it is recommended to start reading those sections first.</p>
<h3 id="nifi-atlas-types">NiFi Atlas Types</h3>
@ -82,7 +82,7 @@
</li>
Attributes:
<ul>
<li>qualifiedName: Root ProcessGroup ID@clusterName (e.g. 86420a14-2fab-3e1e-4331-fb6ab42f58e0@cl1)</li>
<li>qualifiedName: Root ProcessGroup ID@namespace (e.g. 86420a14-2fab-3e1e-4331-fb6ab42f58e0@ns1)</li>
<li>name: Name of the Root ProcessGroup.</li>
<li>url: URL of the NiFi instance. This can be specified via reporting task 'NiFi URL for Atlas' property.</li>
</ul>
@ -91,7 +91,7 @@
<li>nifi_flow_path <p>Part of a NiFi data flow containing one or more processing NiFi components such as Processors and RemoteGroupPorts. The reporting task divides a NiFi flow into multiple flow paths. See <a href="#path-separation">Path Separation Logic</a> for details.</p></li>
Attributes:
<ul>
<li>qualifiedName: The first NiFi component Id in a path@clusterName (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@cl1)</li>
<li>qualifiedName: The first NiFi component Id in a path@namespace (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@ns1)</li>
<li>name: NiFi component namess within a path are concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute)</li>
<li>url: A deep link to the first NiFi component in corresponding NiFi UI</li>
</ul>
@ -100,7 +100,7 @@
<li>nifi_input/output_port <p>Represents a RootGroupPort which can be accessed by RemoteProcessGroup via Site-to-Site protocol.</p></li>
Attributes:
<ul>
<li>qualifiedName: Port ID@clusterName (e.g. 3f6d405e-6e3d-38c9-c5af-ce158f8e593d@cl1)</li>
<li>qualifiedName: Port ID@namespace (e.g. 3f6d405e-6e3d-38c9-c5af-ce158f8e593d@ns1)</li>
<li>name: Name of the Port.</li>
</ul>
</ul>
@ -108,7 +108,7 @@
<li>nifi_data <p>Represents <a href="#unknown-datasets">Unknown DataSets</a> created by CREATE/SEND/RECEIVE NiFi provenance events those do not have particular provenance event analyzer.</p></li>
Attributes:
<ul>
<li>qualifiedName: ID of a Processor which generated the provenance event@clusterName (e.g. db8bb12c-5cd3-3011-c971-579f460ebedf@cl1)</li>
<li>qualifiedName: ID of a Processor which generated the provenance event@namespace (e.g. db8bb12c-5cd3-3011-c971-579f460ebedf@ns1)</li>
<li>name: Name of the Processor.</li>
</ul>
</ul>
@ -121,40 +121,41 @@
</ul>
</ul>
<h3 id="cluster-name">Cluster Name Resolution</h3>
<h3 id="namespaces">Namespaces</h3>
<p>An entity in Atlas can be identified by its GUID for any existing objects, or type name and unique attribute can be used if GUID is not known. Qualified name is commonly used as the unique attribute.</p>
<p>Since one Atlas instance can be used to manage multiple environments, i.e clusters, Atlas has to manage objects in different clusters those may have the same name. For example, a Hive table 'request_logs' in a 'cluster-A' and 'cluster-B'. In such case, cluster name embedded in qualified names are crucial.</p>
<p>One Atlas instance can be used to manage multiple environments and objects in different environments may have the same name. For example, a Hive table 'request_logs' in two different clusters, 'cluster-A' and 'cluster-B'. For this reason the qualified names contain a so-called metadata namespace.</p>
<p>It's common practice to provide the cluster name as the namespace, but it can be any arbitrary string.</p>
<p>For these requirements, a qualified name has 'componentId@clusterName' format. E.g. A Hive table qualified name would be dbName.tableName@clusterName (default.request_logs@cluster-A).</p>
<p>With this, a qualified name has 'componentId@namespace' format. E.g. A Hive table qualified name would be dbName.tableName@namespace (default.request_logs@cluster-A).</p>
<p>From this NiFi reporting task standpoint, a cluster name is need to be resolved at following situations:
<p>From this NiFi reporting task standpoint, a namespace is needed to be resolved at following situations:
<ul>
<li>To register NiFi component entities. Which cluster name should be used to represent the current NiFi cluster?</li>
<li>To create lineages from NiFi component to other DataSets. Which cluster does the DataSet resides?</li>
<li>To register NiFi component entities. Which namespace should be used to represent the current NiFi environment?</li>
<li>To create lineages from NiFi component to other DataSets. Which environment does the DataSet resides in?</li>
</ul>
</p>
<p>To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from ip address or hostname to a cluster name.
The mapping can be defined by Dynamic Properties with a name in 'hostnamePattern.ClusterName' format, having its value as a set of Regular Expression Patterns to match ip addresses or host names to a particular cluster name.</p>
<p>To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from IP address or hostname to a namespace.
The mapping can be defined by Dynamic Properties with a name in 'hostnamePattern.namespace' format, having its value as a set of Regular Expression Patterns to match IP addresses or host names to a particular namespace.</p>
<p>As an example, following mapping definition would resolve cluster name 'cluster-A' for ip address such as '192.168.30.123' or hostname 'namenode1.a.example.com', and 'cluster-B' for '192.168.40.223' or 'nifi3.b.example.com'.</p>
<p>As an example, following mapping definition would resolve namespace 'namespace-A' for IP address such as '192.168.30.123' or hostname 'namenode1.a.example.com', and 'namespace-B' for '192.168.40.223' or 'nifi3.b.example.com'.</p>
<pre>
# Dynamic Property Name for cluster-A
hostnamePattern.cluster-A
# Dynamic Property Name for namespace-A
hostnamePattern.namespace-A
# Value can have multiple Regular Expression patterns separated by new line
192\.168\.30\.\d+
[^\.]+\.a\.example\.com
# Dynamic Property Name for cluster-B
hostnamePattern.cluster-B
# Dynamic Property Name for namespace-B
hostnamePattern.namespace-B
# Values
192\.168\.40\.\d+
[^\.]+\.b\.example\.com
</pre>
<p>If any cluster name mapping does not match, then a name defined at 'Atlas Default Cluster Name' is used.</p>
<p>If no namespace mapping matches, then a name defined at 'Atlas Default Metadata Namespace' is used.</p>
<h3 id="nifi-flow-structure">NiFi flow structure</h3>
@ -271,11 +272,11 @@ Processor 3</pre>
<p>To identify such Process and DataSet Atlas entities, this reporting task uses NiFi Provenance Events. At least, the reporting task needs to derive following information from a NiFi Provenance event record:
<ul>
<li>typeName (e.g. fs_path, hive_table)</li>
<li>qualifiedName in uniqueId@clusterName (e.g. /data/A1.csv@BranchOffice1)</li>
<li>qualifiedName in uniqueId@namespace (e.g. /data/A1.csv@ns1)</li>
</ul>
</p>
<p>'clusterName' in 'qualifiedName' attribute is resolved by mapping ip-address or hostname available at NiFi Provenance event 'transitUri' to a cluster name. See <a href="cluster-name">Cluster Name Resolution</a> for detail.</p>
<p>'namespace' in 'qualifiedName' attribute is resolved by mapping ip-address or hostname available at NiFi Provenance event 'transitUri' to a namespace. See <a href="namespaces">Namespaces</a> for detail.</p>
<p>For 'typeName' and 'qualifiedName', different analysis rules are needed for different DataSet. ReportLineageToAtlas provides an extension point called 'NiFiProvenanceEventAnalyzer' to implement such analysis logic for particular DataSets.</p>
@ -327,8 +328,8 @@ Processor 3</pre>
<td>
nifi_input_port
</td>
<td>rootGroupPortGUID@clusterName
(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
<td>rootGroupPortGUID@namespace
(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1)</td>
<td></td>
</tr>
<tr>
@ -343,7 +344,7 @@ upstream (nifi_flow_path)
(nifi_input_port)
</pre>
</td>
<td>remoteInputPortGUID@clusterName<br/>(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@cl1)
<td>remoteInputPortGUID@namespace<br/>(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@ns1)
<p>NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.</p></td>
<td></td>
</tr>
@ -363,8 +364,8 @@ upstream (nifi_flow_path)
<td>
nifi_output_port
</td>
<td>rootGroupPortGUID@clusterName
(e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
<td>rootGroupPortGUID@namespace
(e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@ns1)</td>
<td></td>
</tr>
<tr>
@ -382,9 +383,9 @@ remote target port
</td>
<td>
<ul>
<li>For 'nifi_flow_path': remoteOutputPortGUID@clusterName<br/>(e.g. 7375f8f6-4604-468d-144c-a8d71255027d@cl1)
<li>For 'nifi_flow_path': remoteOutputPortGUID@namespace<br/>(e.g. 7375f8f6-4604-468d-144c-a8d71255027d@ns1)
<p>NOTE: The remoteOutputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Output Ports can pull from the same target remote output port.</p></li>
<li>For 'nifi_queue': downstreamPathGUID@clusterName<br/>(e.g. bb530e58-ee14-3cac-144c-a8d71255027d@cl1)</li>
<li>For 'nifi_queue': downstreamPathGUID@namespace<br/>(e.g. bb530e58-ee14-3cac-144c-a8d71255027d@ns1)</li>
</ul>
</td>
<td></td>
@ -409,7 +410,7 @@ remote target port
nifi_input_port<br/>
nifi_output_port
</td>
<td>rootGroupPortGUID@clusterName<br/>(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
<td>rootGroupPortGUID@namespace<br/>(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1)</td>
<td></td>
</tr>
<tr>
@ -435,7 +436,7 @@ remote target port
(Protocol can be either PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL)
</td>
<td>kafka_topic</td>
<td>topicName@clusterName<br/>(e.g. testTopic@cl1)</td>
<td>topicName@namespace<br/>(e.g. testTopic@ns1)</td>
<td><strong>NOTE:</strong>With Atlas earlier than 0.8.2, the same topic name in different clusters can not be created using the pre-built 'kafka_topic'. See <a href="https://issues.apache.org/jira/browse/ATLAS-2286">ATLAS-2286</a>.</td>
</tr>
<tr>
@ -444,7 +445,7 @@ remote target port
<td>SEND</td>
<td>thrift://hive.example.com:9083</td>
<td>hive_table</td>
<td>tableName@clusterName<br/>(e.g. myTable@cl1)</td>
<td>tableName@namespace<br/>(e.g. myTable@ns1)</td>
<td></td>
</tr>
@ -460,7 +461,7 @@ remote target port
</td>
<td>jdbc:hive2://hive.example.com:10000/default</td>
<td>hive_table</td>
<td>tableName@clusterName<br/>(e.g. myTable@cl1)</td>
<td>tableName@namespace<br/>(e.g. myTable@ns1)</td>
<td>The corresponding Processors parse Hive QL to set 'query.input.tables' and 'query.output.tables' FlowFile attributes. These attribute values are used to create qualified name.</td>
</tr>
<tr>
@ -485,7 +486,7 @@ remote target port
</td>
<td>hdfs://nn.example.com:8020/user/nifi/5262553828219</td>
<td>hdfs_path</td>
<td>/path/fileName@clusterName<br/>(e.g. /app/warehouse/hive/db/default@cl1)</td>
<td>/path/fileName@namespace<br/>(e.g. /app/warehouse/hive/db/default@ns1)</td>
<td></td>
</tr>
<tr>
@ -508,7 +509,7 @@ remote target port
</td>
<td>hbase://hmaster.example.com:16000/tableA/rowX</td>
<td>hbase_table</td>
<td>tableName@clusterName<br/>(e.g. myTable@cl1)</td>
<td>tableName@namespace<br/>(e.g. myTable@ns1)</td>
<td></td>
</tr>
<tr>
@ -541,7 +542,7 @@ remote target port
</td>
<td></td>
<td>nifi_data</td>
<td>processorGuid@clusterName<br/>db8bb12c-5cd3-3011-c971-579f460ebedf@cl1</td>
<td>processorGuid@namespace<br/>db8bb12c-5cd3-3011-c971-579f460ebedf@ns1</td>
<td></td>
</tr>
</table>

View File

@ -63,10 +63,10 @@ public class TestNiFiFlowAnalyzer {
final NiFiFlowAnalyzer analyzer = new NiFiFlowAnalyzer();
final NiFiFlow nifiFlow = new NiFiFlow(rootPG.getId());
nifiFlow.setClusterName("cluster1");
nifiFlow.setNamespace("namespace1");
analyzer.analyzeProcessGroup(nifiFlow, rootPG);
assertEquals("1234-5678-0000-0000@cluster1", nifiFlow.getQualifiedName());
assertEquals("1234-5678-0000-0000@namespace1", nifiFlow.getQualifiedName());
}
private ProcessorStatus createProcessor(ProcessGroupStatus pgStatus, String type) {
@ -237,7 +237,7 @@ public class TestNiFiFlowAnalyzer {
final NiFiFlowAnalyzer analyzer = new NiFiFlowAnalyzer();
final NiFiFlow nifiFlow = new NiFiFlow(rootPG.getId());
nifiFlow.setClusterName("cluster1");
nifiFlow.setNamespace("namespace1");
analyzer.analyzeProcessGroup(nifiFlow, rootPG);
assertEquals(4, nifiFlow.getProcessors().size());
@ -259,7 +259,7 @@ public class TestNiFiFlowAnalyzer {
assertEquals(1, pathC.getInputs().size());
final AtlasObjectId queue = pathC.getInputs().iterator().next();
assertEquals(TYPE_NIFI_QUEUE, queue.getTypeName());
assertEquals(toQualifiedName("cluster1", pathC.getId()), queue.getUniqueAttributes().get(ATTR_QUALIFIED_NAME));
assertEquals(toQualifiedName("namespace1", pathC.getId()), queue.getUniqueAttributes().get(ATTR_QUALIFIED_NAME));
// Should be able to find a path from a given processor GUID.
final NiFiFlowPath pathForPr0 = nifiFlow.findPath(pr0.getId());

View File

@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
@ -45,11 +45,11 @@ public class TestHBaseTable {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -60,7 +60,7 @@ public class TestHBaseTable {
Referenceable ref = refs.getInputs().iterator().next();
assertEquals("hbase_table", ref.getTypeName());
assertEquals("tableA", ref.get(ATTR_NAME));
assertEquals("tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -72,14 +72,14 @@ public class TestHBaseTable {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(
matches("zk0.example.com"),
matches("zk2.example.com"),
matches("zk3.example.com"))).thenReturn("cluster1");
matches("zk3.example.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -90,7 +90,7 @@ public class TestHBaseTable {
Referenceable ref = refs.getInputs().iterator().next();
assertEquals("hbase_table", ref.getTypeName());
assertEquals("tableA", ref.get(ATTR_NAME));
assertEquals("tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
@ -46,11 +46,11 @@ public class TestHDFSPath {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -61,6 +61,6 @@ public class TestHDFSPath {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hdfs_path", ref.getTypeName());
assertEquals("/user/nifi/fileA", ref.get(ATTR_NAME));
assertEquals("/user/nifi/fileA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("/user/nifi/fileA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
@ -56,11 +56,11 @@ public class TestHive2JDBC {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -71,7 +71,7 @@ public class TestHive2JDBC {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_db", ref.getTypeName());
assertEquals("database_a", ref.get(ATTR_NAME));
assertEquals("database_a@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("database_a@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
/**
@ -90,11 +90,11 @@ public class TestHive2JDBC {
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -103,8 +103,8 @@ public class TestHive2JDBC {
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("database_a.table_a1@cluster1", "table_a1");
expectedInputRefs.put("database_a.table_a2@cluster1", "table_a2");
expectedInputRefs.put("database_a.table_a1@namespace1", "table_a1");
expectedInputRefs.put("database_a.table_a2@namespace1", "table_a2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
@ -115,7 +115,7 @@ public class TestHive2JDBC {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("table_b1", ref.get(ATTR_NAME));
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
/**
@ -134,11 +134,11 @@ public class TestHive2JDBC {
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -147,8 +147,8 @@ public class TestHive2JDBC {
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
expectedInputRefs.put("default.table_a1@namespace1", "table_a1");
expectedInputRefs.put("default.table_a2@namespace1", "table_a2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
@ -159,7 +159,7 @@ public class TestHive2JDBC {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("table_b1", ref.get(ATTR_NAME));
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
/**
@ -177,11 +177,11 @@ public class TestHive2JDBC {
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -190,8 +190,8 @@ public class TestHive2JDBC {
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
expectedInputRefs.put("default.table_a1@namespace1", "table_a1");
expectedInputRefs.put("default.table_a2@namespace1", "table_a2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
@ -202,7 +202,7 @@ public class TestHive2JDBC {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("table_b1", ref.get(ATTR_NAME));
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
/**
@ -222,11 +222,11 @@ public class TestHive2JDBC {
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -235,8 +235,8 @@ public class TestHive2JDBC {
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
expectedInputRefs.put("default.table_a1@namespace1", "table_a1");
expectedInputRefs.put("default.table_a2@namespace1", "table_a2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
@ -247,7 +247,7 @@ public class TestHive2JDBC {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("table_b1", ref.get(ATTR_NAME));
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
/**
@ -265,11 +265,11 @@ public class TestHive2JDBC {
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -278,8 +278,8 @@ public class TestHive2JDBC {
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("some_database.table_a1@cluster1", "table_a1");
expectedInputRefs.put("some_database.table_a2@cluster1", "table_a2");
expectedInputRefs.put("some_database.table_a1@namespace1", "table_a1");
expectedInputRefs.put("some_database.table_a2@namespace1", "table_a2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
@ -290,7 +290,7 @@ public class TestHive2JDBC {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("table_b1", ref.get(ATTR_NAME));
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("database_b.table_b1@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
@ -46,11 +46,11 @@ public class TestKafkaTopic {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -61,7 +61,7 @@ public class TestKafkaTopic {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("topicA", ref.get(ATTR_NAME));
assertEquals("topicA", ref.get("topic"));
assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -73,11 +73,11 @@ public class TestKafkaTopic {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -88,7 +88,7 @@ public class TestKafkaTopic {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("topicA", ref.get(ATTR_NAME));
assertEquals("topicA", ref.get("topic"));
assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -100,11 +100,11 @@ public class TestKafkaTopic {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -116,7 +116,7 @@ public class TestKafkaTopic {
assertEquals("kafka_topic", ref.getTypeName());
assertEquals("topicA", ref.get(ATTR_NAME));
assertEquals("topicA", ref.get("topic"));
assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -128,11 +128,11 @@ public class TestKafkaTopic {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -144,7 +144,7 @@ public class TestKafkaTopic {
assertEquals("kafka_topic", ref.getTypeName());
assertEquals("topicA", ref.get(ATTR_NAME));
assertEquals("topicA", ref.get("topic"));
assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("topicA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -22,7 +22,7 @@ import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -60,8 +60,8 @@ public class TestNiFiRemotePort {
when(sendEvent.getTransitUri()).thenReturn(transitUri);
when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -70,7 +70,7 @@ public class TestNiFiRemotePort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
@ -86,7 +86,7 @@ public class TestNiFiRemotePort {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
assertEquals("inputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -99,8 +99,8 @@ public class TestNiFiRemotePort {
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -109,7 +109,7 @@ public class TestNiFiRemotePort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType());
@ -121,7 +121,7 @@ public class TestNiFiRemotePort {
Referenceable ref = refs.getInputs().iterator().next();
assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
assertEquals("outputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -138,8 +138,8 @@ public class TestNiFiRemotePort {
when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
when(sendEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -148,7 +148,7 @@ public class TestNiFiRemotePort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionTo(matches("s2s-client-component-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
@ -164,7 +164,7 @@ public class TestNiFiRemotePort {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
assertEquals("inputPortA", ref.get(ATTR_NAME));
assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("remote-port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -180,8 +180,8 @@ public class TestNiFiRemotePort {
when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
when(record.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -190,7 +190,7 @@ public class TestNiFiRemotePort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionFrom(matches("s2s-client-component-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType());
@ -202,7 +202,7 @@ public class TestNiFiRemotePort {
Referenceable ref = refs.getInputs().iterator().next();
assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
assertEquals("outputPortA", ref.get(ATTR_NAME));
assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("remote-port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -22,7 +22,7 @@ import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@ -59,8 +59,8 @@ public class TestNiFiRootGroupPort {
when(receiveEvent.getTransitUri()).thenReturn(transitUri);
when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -69,7 +69,7 @@ public class TestNiFiRootGroupPort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
@ -85,7 +85,7 @@ public class TestNiFiRootGroupPort {
Referenceable ref = refs.getInputs().iterator().next();
assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
assertEquals("inputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -98,8 +98,8 @@ public class TestNiFiRootGroupPort {
when(sendEvent.getTransitUri()).thenReturn(transitUri);
when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -108,7 +108,7 @@ public class TestNiFiRootGroupPort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
@ -120,7 +120,7 @@ public class TestNiFiRootGroupPort {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
assertEquals("outputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -135,8 +135,8 @@ public class TestNiFiRootGroupPort {
when(receiveEvent.getTransitUri()).thenReturn(transitUri);
when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -145,7 +145,7 @@ public class TestNiFiRootGroupPort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
@ -161,7 +161,7 @@ public class TestNiFiRootGroupPort {
Referenceable ref = refs.getInputs().iterator().next();
assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
assertEquals("inputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -175,8 +175,8 @@ public class TestNiFiRootGroupPort {
when(sendEvent.getTransitUri()).thenReturn(transitUri);
when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final ConnectionStatus connection = new ConnectionStatus();
@ -185,7 +185,7 @@ public class TestNiFiRootGroupPort {
connections.add(connection);
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
@ -197,7 +197,7 @@ public class TestNiFiRootGroupPort {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
assertEquals("outputPortA", ref.get(ATTR_NAME));
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("port-guid@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
@ -55,11 +55,11 @@ public class TestPutHiveStreaming {
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_A.table_A");
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@ -70,6 +70,6 @@ public class TestPutHiveStreaming {
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("table_a", ref.get(ATTR_NAME));
assertEquals("database_a.table_a@cluster1", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("database_a.table_a@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}

View File

@ -21,7 +21,7 @@ import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@ -50,15 +50,15 @@ public class TestUnknownDataSet {
when(record.getComponentId()).thenReturn(processorId);
when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionTo(processorId)).thenReturn(connections);
when(context.getNiFiClusterName()).thenReturn("nifi-cluster");
when(context.getNiFiNamespace()).thenReturn("test_namespace");
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, null, record.getEventType());
assertNotNull(analyzer);
@ -69,7 +69,7 @@ public class TestUnknownDataSet {
Referenceable ref = refs.getInputs().iterator().next();
assertEquals("nifi_data", ref.getTypeName());
assertEquals("GenerateFlowFile", ref.get(ATTR_NAME));
assertEquals("processor-1234@nifi-cluster", ref.get(ATTR_QUALIFIED_NAME));
assertEquals("processor-1234@test_namespace", ref.get(ATTR_QUALIFIED_NAME));
}
@Test
@ -81,15 +81,15 @@ public class TestUnknownDataSet {
when(record.getComponentId()).thenReturn(processorId);
when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE);
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
final List<ConnectionStatus> connections = new ArrayList<>();
// The content of connection is not important, just create an empty status.
connections.add(new ConnectionStatus());
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
when(context.findConnectionTo(processorId)).thenReturn(connections);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, null, record.getEventType());

View File

@ -77,7 +77,7 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWOR
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY_COMPLETE_PATH;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_LINEAGE_STRATEGY;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY;
import static org.apache.nifi.atlas.reporting.SimpleProvenanceRecord.pr;
import static org.apache.nifi.provenance.ProvenanceEventType.ATTRIBUTES_MODIFIED;
import static org.apache.nifi.provenance.ProvenanceEventType.CREATE;
@ -778,7 +778,7 @@ public class ITReportLineageToAtlas {
@Test
public void testS2SSendComplete() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SSend");
tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
testS2SSend(tc);
@ -837,7 +837,7 @@ public class ITReportLineageToAtlas {
@Test
public void testS2SSendCompleteRAW() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SSendRAW");
tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
testS2SSendRAW(tc);
@ -1294,7 +1294,7 @@ public class ITReportLineageToAtlas {
@Test
public void testSimpleEventLevelCompletePath() throws Exception {
final TestConfiguration tc = new TestConfiguration("SimpleEventLevel");
tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
final ProvenanceRecords prs = tc.provenanceRecords;
String flowFileUUIDA = "A0000000-0000-0000";
@ -1339,7 +1339,7 @@ public class ITReportLineageToAtlas {
@Test
public void testMergedEvents() throws Exception {
final TestConfiguration tc = new TestConfiguration("MergedEvents");
tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
final ProvenanceRecords prs = tc.provenanceRecords;
final String flowFileUUIDA = "A0000000-0000-0000";
final String flowFileUUIDB = "B0000000-0000-0000";
@ -1422,7 +1422,7 @@ public class ITReportLineageToAtlas {
@Test
public void testRecordAndDataSetLevel() throws Exception {
final TestConfiguration tc = new TestConfiguration("RecordAndDataSetLevel");
tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
tc.properties.put(LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
final ProvenanceRecords prs = tc.provenanceRecords;
// Publish part

View File

@ -39,10 +39,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -63,6 +66,7 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_BOOTSTR
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -86,7 +90,7 @@ public class TestReportLineageToAtlas {
}
@Test
public void validateAtlasUrls() throws Exception {
public void validateAtlasUrlsFromProperty() throws Exception {
final MockProcessContext processContext = new MockProcessContext(testSubject);
final MockValidationContext validationContext = new MockValidationContext(processContext);
@ -105,10 +109,11 @@ public class TestReportLineageToAtlas {
}
};
// Default setting.
assertResults.accept(testSubject.validate(validationContext),
r -> assertTrue("Atlas URLs is required", !r.isValid()));
// Default setting or empty urls
assertTrue(processContext.isValid());
processContext.removeProperty(ATLAS_URLS);
assertTrue(processContext.isValid());
// Invalid URL.
processContext.setProperty(ATLAS_URLS, "invalid");
@ -133,6 +138,165 @@ public class TestReportLineageToAtlas {
r -> assertTrue("Atlas URLs is invalid", !r.isValid()));
}
@Test
public void validateNoAtlasUrlsFromConfig() throws Exception {
// GIVEN
Properties atlasConf = new Properties();
Consumer<Exception> assertion = e -> assertEquals(
"No Atlas URL has been specified! Set either the 'Atlas URLs' property on the processor or the 'atlas.rest.address' property in the atlas configuration file.",
e.getMessage()
);
// WHEN
// THEN
validateAtlasUrlsFromConfig(atlasConf, assertion);
}
@Test
public void validateNoProtocolAtlasUrlsFromConfig() throws Exception {
// GIVEN
String atlasUrls = "noProtocolUrl, https://atlasUrl";
Properties atlasConf = new Properties();
atlasConf.setProperty("atlas.rest.address", atlasUrls);
Consumer<Exception> assertion = e -> assertTrue(
"Expected " + MalformedURLException.class.getSimpleName() + " for " + atlasUrls + ", got " + e,
e.getCause() instanceof MalformedURLException
);
// WHEN
// THEN
validateAtlasUrlsFromConfig(atlasConf, assertion);
}
private void validateAtlasUrlsFromConfig(Properties atlasConf, Consumer<Exception> exceptionConsumer) throws Exception {
// GIVEN
Consumer<Map<PropertyDescriptor, String>> propertiesAdjustment = properties -> {
properties.put(ATLAS_CONF_CREATE, "false");
properties.remove(ATLAS_URLS);
};
// WHEN
// THEN
testSetup(
atlasConf,
propertiesAdjustment,
() -> fail(),
e -> {
assertTrue("Expected a " + ProcessException.class.getSimpleName() + ", got " + e, e instanceof ProcessException);
exceptionConsumer.accept(e);
}
);
}
@Test
public void testCreateAtlasPropertiesWithAtlasURLs() throws Exception {
// GIVEN
String atlasUrls = "http://atlasUrl1,http://atlasUrl2";
Properties atlasConf = new Properties();
Consumer<Map<PropertyDescriptor, String>> propertiesAdjustment = properties -> {
properties.put(ATLAS_CONF_CREATE, "true");
properties.put(ATLAS_URLS, atlasUrls);
};
Runnable assertion = () -> {
Properties atlasProperties = new Properties();
final File atlasPropertiesFile = new File("target/atlasConfDir", "atlas-application.properties");
try (InputStream in = new FileInputStream(atlasPropertiesFile)) {
atlasProperties.load(in);
} catch (Exception e) {
throw new AssertionError(e);
}
assertEquals(atlasUrls, atlasProperties.getProperty("atlas.rest.address"));
};
// WHEN
// THEN
testSetup(
atlasConf,
propertiesAdjustment,
assertion,
e -> {
throw new AssertionError(e);
}
);
}
@Test
public void testCreateAtlasPropertiesWithMetadataNamespace() throws Exception {
// GIVEN
String atlasMetadataNamespace = "namespace";
Properties atlasConf = new Properties();
Consumer<Map<PropertyDescriptor, String>> propertiesAdjustment = properties -> {
properties.put(ATLAS_CONF_CREATE, "true");
properties.put(ATLAS_DEFAULT_CLUSTER_NAME, atlasMetadataNamespace);
};
Runnable assertion = () -> {
Properties atlasProperties = new Properties();
final File atlasPropertiesFile = new File("target/atlasConfDir", "atlas-application.properties");
try (InputStream in = new FileInputStream(atlasPropertiesFile)) {
atlasProperties.load(in);
} catch (Exception e) {
throw new AssertionError(e);
}
assertEquals(atlasMetadataNamespace, atlasProperties.getProperty("atlas.metadata.namespace"));
};
// WHEN
// THEN
testSetup(
atlasConf,
propertiesAdjustment,
assertion,
e -> {
throw new AssertionError(e);
}
);
}
private void testSetup(
Properties atlasConf,
Consumer<Map<PropertyDescriptor, String>> propertiesAdjustment,
Runnable onSuccess, Consumer<Exception> exceptionConsumer
) throws Exception {
// GIVEN
String atlasConfDir = createAtlasConfDir();
Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
propertiesAdjustment.accept(properties);
saveAtlasConf(atlasConfDir, atlasConf);
reportingContext = mock(ReportingContext.class);
when(reportingContext.getProperties()).thenReturn(properties);
when(reportingContext.getProperty(any())).then(invocation -> new MockPropertyValue(properties.get(invocation.getArguments()[0])));
ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
testSubject.initialize(initializationContext);
// WHEN
try {
testSubject.setup(configurationContext);
onSuccess.run();
// THEN
} catch (Exception e) {
exceptionConsumer.accept(e);
}
}
@Test
public void testDefaultConnectAndReadTimeout() throws Exception {
// GIVEN

View File

@ -30,7 +30,7 @@ import java.util.Map;
import static org.mockito.Mockito.when;
public class TestRegexClusterResolver {
public class TestRegexNamespaceResolver {
private PropertyContext context;
private ValidationContext validationContext;
@ -45,7 +45,7 @@ public class TestRegexClusterResolver {
@Test
public void testEmptySettings() {
setupMock(Collections.EMPTY_MAP);
final RegexClusterResolver resolver = new RegexClusterResolver();
final RegexNamespaceResolver resolver = new RegexNamespaceResolver();
// It should be valid
final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
@ -56,16 +56,16 @@ public class TestRegexClusterResolver {
}
@Test
public void testInvalidClusterName() {
public void testInvalidNamespace() {
final Map<String, String> properties = new HashMap<>();
properties.put(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, ".*\\.example.com");
properties.put(RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX, ".*\\.example.com");
setupMock(properties);
final RegexClusterResolver resolver = new RegexClusterResolver();
final RegexNamespaceResolver resolver = new RegexNamespaceResolver();
final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
Assert.assertEquals(1, validationResults.size());
final ValidationResult validationResult = validationResults.iterator().next();
Assert.assertEquals(RegexClusterResolver.PATTERN_PROPERTY_PREFIX, validationResult.getSubject());
Assert.assertEquals(RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX, validationResult.getSubject());
try {
resolver.configure(context);
@ -77,10 +77,10 @@ public class TestRegexClusterResolver {
@Test
public void testEmptyPattern() {
final Map<String, String> properties = new HashMap<>();
final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
final String propertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + "Namespace1";
properties.put(propertyName, "");
setupMock(properties);
final RegexClusterResolver resolver = new RegexClusterResolver();
final RegexNamespaceResolver resolver = new RegexNamespaceResolver();
final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
Assert.assertEquals(1, validationResults.size());
@ -97,61 +97,64 @@ public class TestRegexClusterResolver {
@Test
public void testSinglePattern() {
final Map<String, String> properties = new HashMap<>();
final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
final String propertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + "Namespace1";
properties.put(propertyName, "^.*\\.example.com$");
setupMock(properties);
final RegexClusterResolver resolver = new RegexClusterResolver();
final RegexNamespaceResolver resolver = new RegexNamespaceResolver();
final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
Assert.assertEquals(0, validationResults.size());
resolver.configure(context);
Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com"));
Assert.assertEquals("Namespace1", resolver.fromHostNames("host1.example.com"));
}
@Test
public void testMultiplePatterns() {
final Map<String, String> properties = new HashMap<>();
final String propertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
final String propertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + "Namespace1";
// Hostname or local ip address, delimited with a whitespace
properties.put(propertyName, "^.*\\.example.com$\n^192.168.1.[\\d]+$");
setupMock(properties);
final RegexClusterResolver resolver = new RegexClusterResolver();
final RegexNamespaceResolver resolver = new RegexNamespaceResolver();
final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
Assert.assertEquals(0, validationResults.size());
resolver.configure(context);
Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.example.com"));
Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10"));
Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22"));
Assert.assertEquals("Namespace1", resolver.fromHostNames("host1.example.com"));
Assert.assertEquals("Namespace1", resolver.fromHostNames("192.168.1.10"));
Assert.assertEquals("Namespace1", resolver.fromHostNames("192.168.1.22"));
Assert.assertNull(resolver.fromHostNames("192.168.2.30"));
}
@Test
public void testMultipleClusters() {
public void testMultipleNamespaces() {
String namespace1 = "Namepsace1";
String namespace2 = "Namespace2";
final Map<String, String> properties = new HashMap<>();
final String c1PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster1";
final String c2PropertyName = RegexClusterResolver.PATTERN_PROPERTY_PREFIX + "Cluster2";
final String namespace1PropertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + namespace1;
final String namepsace2PropertyName = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX + namespace2;
// Hostname or local ip address
properties.put(c1PropertyName, "^.*\\.c1\\.example.com$ ^192.168.1.[\\d]+$");
properties.put(c2PropertyName, "^.*\\.c2\\.example.com$ ^192.168.2.[\\d]+$");
properties.put(namespace1PropertyName, "^.*\\.c1\\.example.com$ ^192.168.1.[\\d]+$");
properties.put(namepsace2PropertyName, "^.*\\.c2\\.example.com$ ^192.168.2.[\\d]+$");
setupMock(properties);
final RegexClusterResolver resolver = new RegexClusterResolver();
final RegexNamespaceResolver resolver = new RegexNamespaceResolver();
final Collection<ValidationResult> validationResults = resolver.validate(validationContext);
Assert.assertEquals(0, validationResults.size());
resolver.configure(context);
Assert.assertEquals("Cluster1", resolver.fromHostNames("host1.c1.example.com"));
Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.10"));
Assert.assertEquals("Cluster1", resolver.fromHostNames("192.168.1.22"));
Assert.assertEquals("Cluster2", resolver.fromHostNames("host2.c2.example.com"));
Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.10"));
Assert.assertEquals("Cluster2", resolver.fromHostNames("192.168.2.22"));
Assert.assertEquals(namespace1, resolver.fromHostNames("host1.c1.example.com"));
Assert.assertEquals(namespace1, resolver.fromHostNames("192.168.1.10"));
Assert.assertEquals(namespace1, resolver.fromHostNames("192.168.1.22"));
Assert.assertEquals(namespace2, resolver.fromHostNames("host2.c2.example.com"));
Assert.assertEquals(namespace2, resolver.fromHostNames("192.168.2.10"));
Assert.assertEquals(namespace2, resolver.fromHostNames("192.168.2.22"));
Assert.assertNull(resolver.fromHostNames("192.168.3.30"));
}