From 79a7014a95dc3087f88248c732fb1e4ad8e6e128 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 9 Mar 2018 09:27:20 +0900 Subject: [PATCH] NIFI-4971: ReportLineageToAtlas complete path can miss one-time lineages - Separated Hook message de-duplication logic from NiFiAtlasHook to NotificationSender - NiFiAtlasHook used to send individual CREATE_ENTITY messages for each entities, this commit changed it to bundle all new entities into a single CREATE_ENTITY to preserve entity creation order, so that new DataSet entities can be referred from new nifi_flow_path entities - Added more unit tests This closes #2542 Signed-off-by: Mike Thomsen --- .../apache/nifi/atlas/hook/NiFiAtlasHook.java | 68 +++ .../NotificationSender.java} | 246 +++++++---- .../lineage/AbstractLineageStrategy.java | 4 +- .../lineage/CompleteFlowPathLineage.java | 32 +- .../atlas/reporting/ReportLineageToAtlas.java | 2 +- .../atlas/hook/TestNotificationSender.java | 417 ++++++++++++++++++ 6 files changed, 688 insertions(+), 81 deletions(-) create mode 100644 nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java rename nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/{NiFiAtlasHook.java => hook/NotificationSender.java} (54%) create mode 100644 nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java new file mode 100644 index 0000000000..4916337522 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.hook; + +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFiAtlasHook extends AtlasHook implements LineageContext { + + public static final String NIFI_USER = "nifi"; + + private static final String CONF_PREFIX = "atlas.hook.nifi."; + private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + + private NiFiAtlasClient atlasClient; + + public void setAtlasClient(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + } + + @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; + } + + + private final List messages = new ArrayList<>(); + + @Override + public void addMessage(HookNotificationMessage message) { + messages.add(message); + } + + public void commitMessages() { + final NotificationSender notificationSender = new NotificationSender(); + notificationSender.setAtlasClient(atlasClient); + notificationSender.send(messages, this::notifyEntities); + } + + public void close() { + if (notificationInterface != null) { + notificationInterface.close(); + } + } +} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java similarity index 54% rename from nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java rename to nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java index 58945d5f57..4d599f1cd8 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java @@ -14,33 +14,42 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.atlas; +package org.apache.nifi.atlas.hook; import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; -import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.persistence.Id; -import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.atlas.AtlasUtils; +import org.apache.nifi.atlas.NiFiAtlasClient; import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toMap; +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_CREATE; import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER; import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; @@ -49,16 +58,12 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; /** - * This class is not thread-safe as it holds uncommitted notification messages within instance. - * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + * This class implements Atlas hook notification message deduplication mechanism. + * Separated from {@link NiFiAtlasHook} for better testability. */ -public class NiFiAtlasHook extends AtlasHook implements LineageContext { +class NotificationSender { - public static final String NIFI_USER = "nifi"; - - private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasHook.class); - private static final String CONF_PREFIX = "atlas.hook.nifi."; - private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + private static final Logger logger = LoggerFactory.getLogger(NotificationSender.class); private NiFiAtlasClient atlasClient; @@ -71,7 +76,6 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { */ private final Map typedQualifiedNameToRef; - private static Map createCache(final int maxSize) { return new LinkedHashMap(maxSize, 0.75f, true) { @Override @@ -81,7 +85,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { }; } - public NiFiAtlasHook() { + NotificationSender() { final int qualifiedNameCacheSize = 10_000; this.guidToQualifiedName = createCache(qualifiedNameCacheSize); @@ -89,70 +93,160 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize); } - public void setAtlasClient(NiFiAtlasClient atlasClient) { - this.atlasClient = atlasClient; - } - - @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; - } - - - private final List messages = new ArrayList<>(); - - @Override - public void addMessage(HookNotificationMessage message) { - messages.add(message); - } - private class Metrics { final long startedAt = System.currentTimeMillis(); + + /** + * The total number of messages passed to commitMessages. + */ int totalMessages; + /** + * The number of CreateEntityRequest messages before de-duplication. + */ + int createMessages; + /** + * The number of unique CreateEntityRequest messages for 'nifi_flow_path'. + */ + int uniqueFlowPathCreates; + /** + * The number of unique CreateEntityRequest messages except 'nifi_flow_path'. + */ + int uniqueOtherCreates; int partialNiFiFlowPathUpdates; - int dedupedPartialNiFiFlowPathUpdates; + int uniquePartialNiFiFlowPathUpdates; int otherMessages; int flowPathSearched; int dataSetSearched; int dataSetCacheHit; - private void log(String message) { - logger.debug(String.format("%s, %d ms passed, totalMessages=%d," + - " partialNiFiFlowPathUpdates=%d, dedupedPartialNiFiFlowPathUpdates=%d, otherMessage=%d," + - " flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s," + - " guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d", + + private String toLogString(String message) { + return String.format("%s, %d ms passed, totalMessages=%d," + + " createMessages=%d, uniqueFlowPathCreates=%d, uniqueOtherCreates=%d," + + " partialNiFiFlowPathUpdates=%d, uniquePartialNiFiFlowPathUpdates=%d, otherMessage=%d," + + " flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s," + + " guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d", message, System.currentTimeMillis() - startedAt, totalMessages, - partialNiFiFlowPathUpdates, dedupedPartialNiFiFlowPathUpdates, otherMessages, + createMessages, uniqueFlowPathCreates, uniqueOtherCreates, + partialNiFiFlowPathUpdates, uniquePartialNiFiFlowPathUpdates, otherMessages, flowPathSearched, dataSetSearched, dataSetCacheHit, - guidToQualifiedName.size(), typedQualifiedNameToRef.size())); + guidToQualifiedName.size(), typedQualifiedNameToRef.size()); } } - public void commitMessages() { - final Map> partialNiFiFlowPathUpdateAndOthers - = messages.stream().collect(Collectors.groupingBy(msg - -> ENTITY_PARTIAL_UPDATE.equals(msg.getType()) - && TYPE_NIFI_FLOW_PATH.equals(((EntityPartialUpdateRequest)msg).getTypeName()) - && ATTR_QUALIFIED_NAME.equals(((EntityPartialUpdateRequest)msg).getAttribute()) - )); + void setAtlasClient(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + } + private Predicate distinctReferenceable() { + final Set keys = new HashSet<>(); + return r -> { + final String key = AtlasUtils.toTypedQualifiedName(r.getTypeName(), (String) r.get(ATTR_QUALIFIED_NAME)); + return keys.add(key); + }; + } - final List otherMessages = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(false, k -> Collections.emptyList()); - final List partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(true, k -> Collections.emptyList()); - logger.info("Commit messages: {} partialNiFiFlowPathUpdate and {} other messages.", partialNiFiFlowPathUpdates.size(), otherMessages.size()); + private List safeGet(Map> map, K key) { + return map.computeIfAbsent(key, k -> Collections.emptyList()); + } + @SuppressWarnings("unchecked") + private void mergeRefs(Referenceable r1, Referenceable r2) { + r1.set(ATTR_INPUTS, mergeRefs((Collection) r1.get(ATTR_INPUTS), (Collection) r2.get(ATTR_INPUTS))); + r1.set(ATTR_OUTPUTS, mergeRefs((Collection) r1.get(ATTR_OUTPUTS), (Collection) r2.get(ATTR_OUTPUTS))); + } + + private Collection mergeRefs(Collection r1, Collection r2) { + final boolean isR1Empty = r1 == null || r1.isEmpty(); + final boolean isR2Empty = r2 == null || r2.isEmpty(); + + if (isR1Empty) { + // r2 may or may not have entities, don't have to merge r1. + return r2; + } else if (isR2Empty) { + // r1 has some entities, don't have to merge r2. + return r1; + } + + // If both have entities, then need to be merged. + return Stream.concat(r1.stream(), r2.stream()).filter(distinctReferenceable()).collect(Collectors.toList()); + } + + /** + *

Send hook notification messages. + * In order to notify relationships between 'nifi_flow_path' and its inputs/outputs, this method sends messages in following order:

+ *
    + *
  1. As a a single {@link org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest} message: + *
      + *
    • New entities except 'nifi_flow_path', including DataSets such as 'nifi_queue', 'kafka_topic' or 'hive_table' ... etc, + * so that 'nifi_flow_path' can refer
    • + *
    • New 'nifi_flow_path' entities, entities order is guaranteed in a single message
    • + *
    + *
  2. + *
  3. Update 'nifi_flow_path' messages, before notifying update messages, this method fetches existing 'nifi_flow_path' entity + * to merge new inputs/outputs element with existing ones, so that existing ones will not be removed.
  4. + *
  5. Other messages except
  6. + *
+ *

Messages having the same type and qualified name will be de-duplicated before being sent.

+ * @param messages list of messages to be sent + * @param notifier responsible for sending notification messages, its accept method can be called multiple times + */ + void send(final List messages, final Consumer> notifier) { final Metrics metrics = new Metrics(); - metrics.totalMessages = messages.size(); - metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size(); - metrics.otherMessages = otherMessages.size(); - try { - // Notify other messages first. - notifyEntities(otherMessages); + metrics.totalMessages = messages.size(); - // De-duplicate messages. - final List deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (EntityPartialUpdateRequest) msg) + final Map> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType()))); + + final List creates = safeGet(createAndOthers, true); + metrics.createMessages = creates.size(); + + final Map> newFlowPathsAndOtherEntities = creates.stream() + .flatMap(msg -> ((HookNotification.EntityCreateRequest) msg).getEntities().stream()) + .collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.typeName))); + + // Deduplicate same entity creation messages. + final List newEntitiesExceptFlowPaths = safeGet(newFlowPathsAndOtherEntities, false) + .stream().filter(distinctReferenceable()).collect(Collectors.toList()); + + // Deduplicate same flow paths and also merge inputs and outputs + final Collection newFlowPaths = safeGet(newFlowPathsAndOtherEntities, true).stream() + .collect(toMap(ref -> ref.get(ATTR_QUALIFIED_NAME), ref -> ref, (r1, r2) -> { + // Merge inputs and outputs. + mergeRefs(r1, r2); + return r1; + })).values(); + metrics.uniqueFlowPathCreates = newFlowPaths.size(); + metrics.uniqueOtherCreates = newEntitiesExceptFlowPaths.size(); + + + // 1-1. Notify new entities except 'nifi_flow_path' + // 1-2. Notify new 'nifi_flow_path' + List newEntities = new ArrayList<>(); + newEntities.addAll(newEntitiesExceptFlowPaths); + newEntities.addAll(newFlowPaths); + if (!newEntities.isEmpty()) { + notifier.accept(Collections.singletonList(new HookNotification.EntityCreateRequest(NIFI_USER, newEntities))); + } + + final Map> partialNiFiFlowPathUpdateAndOthers + = safeGet(createAndOthers, false).stream().collect(groupingBy(msg + -> ENTITY_PARTIAL_UPDATE.equals(msg.getType()) + && TYPE_NIFI_FLOW_PATH.equals(((HookNotification.EntityPartialUpdateRequest)msg).getTypeName()) + && ATTR_QUALIFIED_NAME.equals(((HookNotification.EntityPartialUpdateRequest)msg).getAttribute()) + )); + + + // These updates are made against existing flow path entities. + final List partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true); + final List otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false); + metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size(); + metrics.otherMessages = otherMessages.size(); + + + // 2. Notify de-duplicated 'nifi_flow_path' updates + final List deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotification.EntityPartialUpdateRequest) msg) // Group by nifi_flow_path qualifiedName value. - .collect(Collectors.groupingBy(EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream() + .collect(groupingBy(HookNotification.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream() .map(entry -> { final String flowPathQualifiedName = entry.getKey(); final Map distinctInputs; @@ -178,7 +272,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { } // Merge all inputs and outputs for this nifi_flow_path. - for (EntityPartialUpdateRequest msg : entry.getValue()) { + for (HookNotification.EntityPartialUpdateRequest msg : entry.getValue()) { fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics) .entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey())) .forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue())); @@ -194,18 +288,22 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { // org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values())); flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values())); - return new EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, + return new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef); }) .filter(Objects::nonNull) .collect(Collectors.toList()); - metrics.dedupedPartialNiFiFlowPathUpdates = deduplicatedMessages.size(); - notifyEntities(deduplicatedMessages); + metrics.uniquePartialNiFiFlowPathUpdates = deduplicatedMessages.size(); + notifier.accept(deduplicatedMessages); + + // 3. Notify other messages + notifier.accept(otherMessages); + } finally { - metrics.log("Committed"); - messages.clear(); + logger.info(metrics.toLogString("Finished")); } + } /** @@ -223,7 +321,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { return new HashMap<>(); } - final List> refs = (List>) _refs; + final Collection> refs = (Collection>) _refs; return refs.stream().map(ref -> { // Existing reference should has a GUID. final String typeName = (String) ref.get(ATTR_TYPENAME); @@ -256,8 +354,8 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName))); }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) // If duplication happens, use new value. - .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> { - logger.warn("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue}); + .collect(toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> { + logger.debug("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue}); return newValue; })); } @@ -268,7 +366,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { return Collections.emptyMap(); } - final List refs = (List) _refs; + final Collection refs = (Collection) _refs; return refs.stream().map(ref -> { // This ref is created within this reporting cycle, and it may not have GUID assigned yet, if it is a brand new reference. // If cache has the Reference, then use it because instances in the cache are guaranteed to have GUID assigned. @@ -288,13 +386,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { }); return new Tuple<>(refQualifiedName, refFromCacheIfAvailable); - }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) - .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); - } - - public void close() { - if (notificationInterface != null) { - notificationInterface.close(); - } + }).filter(tuple -> tuple.getValue() != null) + .collect(toMap(Tuple::getKey, Tuple::getValue)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java index 11d6e8bd4b..a253c7d7ef 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java @@ -40,7 +40,7 @@ import java.util.stream.Collectors; import static org.apache.nifi.atlas.AtlasUtils.toStr; import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; -import static org.apache.nifi.atlas.NiFiAtlasHook.NIFI_USER; +import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER; import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW; @@ -136,7 +136,7 @@ public abstract class AbstractLineageStrategy implements LineageStrategy { } @SuppressWarnings("unchecked") - private boolean addDataSetRefs(Set refsToAdd, Referenceable nifiFlowPath, String targetAttribute) { + protected boolean addDataSetRefs(Set refsToAdd, Referenceable nifiFlowPath, String targetAttribute) { if (refsToAdd != null && !refsToAdd.isEmpty()) { // If nifiFlowPath already has a given dataSetRef, then it needs not to be created. diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java index 4437bfce69..d3ac658efd 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java @@ -42,7 +42,9 @@ import java.util.zip.CRC32; import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; import static org.apache.nifi.atlas.AtlasUtils.toStr; import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; import static org.apache.nifi.provenance.ProvenanceEventType.DROP; @@ -76,8 +78,36 @@ public class CompleteFlowPathLineage extends AbstractLineageStrategy { createCompleteFlowPath(nifiFlow, lineagePath, createdFlowPaths); for (Tuple createdFlowPath : createdFlowPaths) { final NiFiFlowPath flowPath = createdFlowPath.getKey(); - createEntity(toReferenceable(flowPath, nifiFlow)); + // NOTE 1: FlowPath creation and DataSet references should be reported separately + // ------------------------------------------------------------------------------ + // For example, with following provenance event inputs: + // CREATE(F1), FORK (F1 -> F2, F3), DROP(F1), SEND (F2), SEND(F3), DROP(F2), DROP(F3), + // there is no guarantee that DROP(F2) and DROP(F3) are processed within the same cycle. + // If DROP(F3) is processed in different cycle, it needs to be added to the existing FlowPath + // that contains F1 -> F2, to be F1 -> F2, F3. + // Execution cycle 1: Path1 (source of F1 -> ForkA), ForkA_queue (F1 -> F2), Path2 (ForkA -> dest of F2) + // Execution cycle 2: Path1 (source of F1 -> ForkB), ForkB_queue (F1 -> F3), Path3 (ForkB -> dest of F3) + + // NOTE 2: Both FlowPath creation and FlowPath update messages are required + // ------------------------------------------------------------------------ + // For the 1st time when a lineage is found, nifi_flow_path and referred DataSets are created. + // If we notify these entities by a create 3 entities message (Path1, DataSet1, DataSet2) + // followed by 1 partial update message to add lineage (DataSet1 -> Path1 -> DataSet2), then + // the update message may arrive at Atlas earlier than the create message gets processed. + // If that happens, lineage among these entities will be missed. + // But as written in NOTE1, there is a case where existing nifi_flow_paths need to be updated. + // Also, we don't know if this is the 1st time or 2nd or later. + // So, we need to notify entity creation and also partial update messages. + + // Create flow path entity with DataSet refs. + final Referenceable flowPathRef = toReferenceable(flowPath, nifiFlow); + final DataSetRefs dataSetRefs = createdFlowPath.getValue(); + addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS); + addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS); + createEntity(flowPathRef); + // Also, sending partial update message to update existing flow_path. addDataSetRefs(nifiFlow, Collections.singleton(flowPath), createdFlowPath.getValue()); + } createdFlowPaths.clear(); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 29ae013ccd..29926baa64 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -56,8 +56,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.hook.NiFiAtlasHook; import org.apache.nifi.atlas.NiFiAtlasClient; -import org.apache.nifi.atlas.NiFiAtlasHook; import org.apache.nifi.atlas.NiFiFlow; import org.apache.nifi.atlas.NiFiFlowAnalyzer; import org.apache.nifi.atlas.provenance.AnalysisContext; diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java new file mode 100644 index 0000000000..8b356e140b --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.hook; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.AtlasUtils; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_DATASET; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; +import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestNotificationSender { + + private static final Logger logger = LoggerFactory.getLogger(TestNotificationSender.class); + + private static class Notifier implements Consumer> { + private final List> notifications = new ArrayList<>(); + @Override + public void accept(List messages) { + logger.info("notified at {}, {}", notifications.size(), messages); + notifications.add(messages); + } + } + + @Test + public void testZeroMessage() { + final NotificationSender sender = new NotificationSender(); + final List messages = Collections.emptyList(); + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + assertEquals(0, notifier.notifications.get(0).size()); + assertEquals(0, notifier.notifications.get(1).size()); + } + + private Referenceable createRef(String type, String qname) { + final Referenceable ref = new Referenceable(type); + ref.set(ATTR_QUALIFIED_NAME, qname); + return ref; + } + + @SuppressWarnings("unchecked") + private void assertCreateMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) { + assertTrue(notifier.notifications.size() > notificationIndex); + final List messages = notifier.notifications.get(notificationIndex); + assertEquals(1, messages.size()); + final HookNotification.EntityCreateRequest message = (HookNotification.EntityCreateRequest) messages.get(0); + assertEquals(expects.length, message.getEntities().size()); + + // The use of 'flatMap' at NotificationSender does not preserve actual entities order. + // Use typed qname map to assert regardless of ordering. + final Map entities = message.getEntities().stream().collect(Collectors.toMap( + ref -> AtlasUtils.toTypedQualifiedName(ref.getTypeName(), (String) ref.get(ATTR_QUALIFIED_NAME)), ref -> ref)); + + boolean hasFlowPathSeen = false; + for (int i = 0; i < expects.length; i++) { + final Referenceable expect = expects[i]; + final String typeName = expect.getTypeName(); + final Referenceable actual = entities.get(AtlasUtils.toTypedQualifiedName(typeName, (String) expect.get(ATTR_QUALIFIED_NAME))); + assertNotNull(actual); + assertEquals(typeName, actual.getTypeName()); + assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.get(ATTR_QUALIFIED_NAME)); + + if (TYPE_NIFI_FLOW_PATH.equals(typeName)) { + assertIOReferences(expect, actual, ATTR_INPUTS); + assertIOReferences(expect, actual, ATTR_OUTPUTS); + hasFlowPathSeen = true; + } else { + assertFalse("Types other than nifi_flow_path should be created before any nifi_flow_path entity.", hasFlowPathSeen); + } + } + } + + @SuppressWarnings("unchecked") + private void assertIOReferences(Referenceable expect, Referenceable actual, String attrName) { + final Collection expectedRefs = (Collection) expect.get(attrName); + if (expectedRefs != null) { + final Collection actualRefs = (Collection) actual.get(attrName); + assertEquals(expectedRefs.size(), actualRefs.size()); + final Iterator actualIterator = actualRefs.iterator(); + for (Referenceable expectedRef : expectedRefs) { + final Referenceable actualRef = actualIterator.next(); + assertEquals(expectedRef.getTypeName(), actualRef.getTypeName()); + assertEquals(expectedRef.get(ATTR_QUALIFIED_NAME), actualRef.get(ATTR_QUALIFIED_NAME)); + } + } + } + + @SuppressWarnings("unchecked") + private void assertUpdateFlowPathMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) { + assertTrue(notifier.notifications.size() > notificationIndex); + final List messages = notifier.notifications.get(notificationIndex); + assertEquals(expects.length, messages.size()); + for (int i = 0; i < expects.length; i++) { + final Referenceable expect = expects[i]; + final HookNotification.EntityPartialUpdateRequest actual = (HookNotification.EntityPartialUpdateRequest) messages.get(i); + assertEquals(expect.getTypeName(), actual.getTypeName()); + assertEquals(ATTR_QUALIFIED_NAME, actual.getAttribute()); + assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.getAttributeValue()); + + final Collection expIn = (Collection) expect.get(ATTR_INPUTS); + final Collection expOut = (Collection) expect.get(ATTR_OUTPUTS); + assertTrue(expIn.containsAll((Collection) actual.getEntity().get(ATTR_INPUTS))); + assertTrue(expOut.containsAll((Collection) actual.getEntity().get(ATTR_OUTPUTS))); + } + } + + @Test + public void testOneCreateDataSetMessage() { + final NotificationSender sender = new NotificationSender(); + final Referenceable queue1 = createRef(TYPE_NIFI_QUEUE, "queue1@test"); + final List messages = Collections.singletonList( + new HookNotification.EntityCreateRequest(NIFI_USER, queue1)); + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + + // EntityCreateRequest containing nifi_queue. + assertCreateMessage(notifier, 0, queue1); + } + + @Test + public void testCreateDataSetMessageDeduplication() { + // Simulating complete path, that there were 4 FlowFiles went through the same partial flow. + // FF1 was ingested from data1, and so was FF2 from data2. + // Then FF1 was FORKed to FF11 and FF12, then sent to data11 and data12. + // Similarly FF2 was FORKed to FF21 and FF22, then sent to data21 and data22. + // FF3 went through the same as FF1, so did FF4 as FF2. + // All of those provenance events were processed within a single ReportLineageToAtlas cycle. + + // FF1: data1 -> pathA1 -> FORK11 (FF11) -> pathB11 -> data11 + // -> FORK12 (FF12) -> pathB12 -> data12 + // FF2: data2 -> pathA2 -> FORK21 (FF21) -> pathB21 -> data21 + // -> FORK22 (FF22) -> pathB22 -> data22 + // FF3: data1 -> pathA1 -> FORK11 (FF31) -> pathB11 -> data11 + // -> FORK12 (FF32) -> pathB12 -> data12 + // FF4: data2 -> pathA2 -> FORK21 (FF41) -> pathB21 -> data21 + // -> FORK22 (FF42) -> pathB22 -> data22 + + // As a result, following lineages are reported to Atlas: + // data1 -> pathA1 -> FORK11 -> pathB11 -> data11 + // -> FORK12 -> pathB12 -> data12 + // data2 -> pathA2 -> FORK21 -> pathB21 -> data21 + // -> FORK22 -> pathB22 -> data22 + + final NotificationSender sender = new NotificationSender(); + + // From FF1 + final Referenceable ff1_data1 = createRef(TYPE_DATASET, "data1@test"); + final Referenceable ff1_data11 = createRef(TYPE_DATASET, "data11@test"); + final Referenceable ff1_data12 = createRef(TYPE_DATASET, "data12@test"); + final Referenceable ff1_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff1_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff1_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test"); + final Referenceable ff1_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test"); + final Referenceable ff1_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test"); + final Referenceable ff1_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test"); + // From FF11 + ff1_pathA11.set(ATTR_INPUTS, singleton(ff1_data1)); + ff1_pathA11.set(ATTR_OUTPUTS, singleton(ff1_fork11)); + ff1_pathB11.set(ATTR_INPUTS, singleton(ff1_fork11)); + ff1_pathB11.set(ATTR_OUTPUTS, singleton(ff1_data11)); + // From FF12 + ff1_pathA12.set(ATTR_INPUTS, singleton(ff1_data1)); + ff1_pathA12.set(ATTR_OUTPUTS, singleton(ff1_fork12)); + ff1_pathB12.set(ATTR_INPUTS, singleton(ff1_fork12)); + ff1_pathB12.set(ATTR_OUTPUTS, singleton(ff1_data12)); + + // From FF2 + final Referenceable ff2_data2 = createRef(TYPE_DATASET, "data2@test"); + final Referenceable ff2_data21 = createRef(TYPE_DATASET, "data21@test"); + final Referenceable ff2_data22 = createRef(TYPE_DATASET, "data22@test"); + final Referenceable ff2_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff2_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff2_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test"); + final Referenceable ff2_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test"); + final Referenceable ff2_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test"); + final Referenceable ff2_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test"); + // From FF21 + ff2_pathA21.set(ATTR_INPUTS, singleton(ff2_data2)); + ff2_pathA21.set(ATTR_OUTPUTS, singleton(ff2_fork21)); + ff2_pathB21.set(ATTR_INPUTS, singleton(ff2_fork21)); + ff2_pathB21.set(ATTR_OUTPUTS, singleton(ff2_data21)); + // From FF22 + ff2_pathA22.set(ATTR_INPUTS, singleton(ff2_data2)); + ff2_pathA22.set(ATTR_OUTPUTS, singleton(ff2_fork22)); + ff2_pathB22.set(ATTR_INPUTS, singleton(ff2_fork22)); + ff2_pathB22.set(ATTR_OUTPUTS, singleton(ff2_data22)); + + // From FF3 + final Referenceable ff3_data1 = createRef(TYPE_DATASET, "data1@test"); + final Referenceable ff3_data11 = createRef(TYPE_DATASET, "data11@test"); + final Referenceable ff3_data12 = createRef(TYPE_DATASET, "data12@test"); + final Referenceable ff3_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff3_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff3_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test"); + final Referenceable ff3_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test"); + final Referenceable ff3_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test"); + final Referenceable ff3_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test"); + // From FF31 + ff3_pathA11.set(ATTR_INPUTS, singleton(ff3_data1)); + ff3_pathA11.set(ATTR_OUTPUTS, singleton(ff3_fork11)); + ff3_pathB11.set(ATTR_INPUTS, singleton(ff3_fork11)); + ff3_pathB11.set(ATTR_OUTPUTS, singleton(ff3_data11)); + // From FF32 + ff3_pathA12.set(ATTR_INPUTS, singleton(ff3_data1)); + ff3_pathA12.set(ATTR_OUTPUTS, singleton(ff3_fork12)); + ff3_pathB12.set(ATTR_INPUTS, singleton(ff3_fork12)); + ff3_pathB12.set(ATTR_OUTPUTS, singleton(ff3_data12)); + + // From FF4 + final Referenceable ff4_data2 = createRef(TYPE_DATASET, "data2@test"); + final Referenceable ff4_data21 = createRef(TYPE_DATASET, "data21@test"); + final Referenceable ff4_data22 = createRef(TYPE_DATASET, "data22@test"); + final Referenceable ff4_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff4_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff4_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test"); + final Referenceable ff4_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test"); + final Referenceable ff4_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test"); + final Referenceable ff4_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test"); + // From FF41 + ff4_pathA21.set(ATTR_INPUTS, singleton(ff4_data2)); + ff4_pathA21.set(ATTR_OUTPUTS, singleton(ff4_fork21)); + ff4_pathB21.set(ATTR_INPUTS, singleton(ff4_fork21)); + ff4_pathB21.set(ATTR_OUTPUTS, singleton(ff4_data21)); + // From FF42 + ff4_pathA22.set(ATTR_INPUTS, singleton(ff4_data2)); + ff4_pathA22.set(ATTR_OUTPUTS, singleton(ff4_fork22)); + ff4_pathB22.set(ATTR_INPUTS, singleton(ff4_fork22)); + ff4_pathB22.set(ATTR_OUTPUTS, singleton(ff4_data22)); + + + final List messages = asList( + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data1), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB12), + + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data2), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB22), + + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data1), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB12), + + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data2), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB22) + ); + + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + + // EntityCreateRequest, same entities get de-duplicated. nifi_flow_path is created after other types. + final Referenceable r_data1 = createRef(TYPE_DATASET, "data1@test"); + final Referenceable r_data11 = createRef(TYPE_DATASET, "data11@test"); + final Referenceable r_data12 = createRef(TYPE_DATASET, "data12@test"); + final Referenceable r_pathA1 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable r_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test"); + final Referenceable r_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test"); + final Referenceable r_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test"); + final Referenceable r_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test"); + r_pathA1.set(ATTR_INPUTS, singleton(r_data1)); + r_pathA1.set(ATTR_OUTPUTS, asList(r_fork11, r_fork12)); + r_pathB11.set(ATTR_INPUTS, singleton(r_fork11)); + r_pathB11.set(ATTR_OUTPUTS, singleton(r_data11)); + r_pathB12.set(ATTR_INPUTS, singleton(r_fork12)); + r_pathB12.set(ATTR_OUTPUTS, singleton(r_data12)); + + final Referenceable r_data2 = createRef(TYPE_DATASET, "data2@test"); + final Referenceable r_data21 = createRef(TYPE_DATASET, "data21@test"); + final Referenceable r_data22 = createRef(TYPE_DATASET, "data22@test"); + final Referenceable r_pathA2 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable r_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test"); + final Referenceable r_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test"); + final Referenceable r_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test"); + final Referenceable r_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test"); + r_pathA2.set(ATTR_INPUTS, singleton(r_data2)); + r_pathA2.set(ATTR_OUTPUTS, asList(r_fork21, r_fork22)); + r_pathB21.set(ATTR_INPUTS, singleton(r_fork21)); + r_pathB21.set(ATTR_OUTPUTS, singleton(r_data21)); + r_pathB22.set(ATTR_INPUTS, singleton(r_fork22)); + r_pathB22.set(ATTR_OUTPUTS, singleton(r_data22)); + + assertCreateMessage(notifier, 0, + r_data1, r_data11, r_data12, r_fork11, r_fork12, + r_data2, r_data21, r_data22, r_fork21, r_fork22, + r_pathA1, r_pathB11, r_pathB12, + r_pathA2, r_pathB21, r_pathB22); + } + + private Map createGuidReference(String type, String guid) { + Map map = new HashMap<>(); + map.put(ATTR_TYPENAME, type); + map.put(ATTR_GUID, guid); + return map; + } + + @Test + public void testUpdateFlowPath() throws AtlasServiceException { + final NotificationSender sender = new NotificationSender(); + final Referenceable fileC = createRef("fs_path", "/tmp/file-c.txt@test"); + final Referenceable fileD = createRef("fs_path", "/tmp/file-d.txt@test"); + + // New in/out fileC and fileD are found for path1. + final Referenceable newPath1Lineage = createRef(TYPE_NIFI_FLOW_PATH, "path1@test"); + newPath1Lineage.set(ATTR_INPUTS, singleton(fileC)); + newPath1Lineage.set(ATTR_OUTPUTS, singleton(fileD)); + + final List messages = asList( + new HookNotification.EntityCreateRequest(NIFI_USER, fileC), + new HookNotification.EntityCreateRequest(NIFI_USER, fileD), + new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage) + ); + + final NiFiAtlasClient atlasClient = mock(NiFiAtlasClient.class); + sender.setAtlasClient(atlasClient); + + // Existing nifi_flow_path + final AtlasEntity path1Entity = new AtlasEntity(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test"); + path1Entity.setGuid("path1-guid"); + path1Entity.setAttribute(ATTR_INPUTS, singleton(createGuidReference("fs_path", "fileA-guid"))); + path1Entity.setAttribute(ATTR_OUTPUTS, singleton(createGuidReference("fs_path", "fileB-guid"))); + + final AtlasEntity fileAEntity = new AtlasEntity("fs_path", ATTR_QUALIFIED_NAME, "file-a.txt@test"); + fileAEntity.setGuid("fileA-guid"); + + final AtlasEntity fileBEntity = new AtlasEntity("fs_path", ATTR_QUALIFIED_NAME, "file-b.txt@test"); + fileBEntity.setGuid("fileA-guid"); + + final AtlasEntity.AtlasEntityWithExtInfo path1Ext = new AtlasEntity.AtlasEntityWithExtInfo(path1Entity); + final AtlasEntity.AtlasEntityWithExtInfo fileAExt = new AtlasEntity.AtlasEntityWithExtInfo(fileAEntity); + final AtlasEntity.AtlasEntityWithExtInfo fileBExt = new AtlasEntity.AtlasEntityWithExtInfo(fileBEntity); + when(atlasClient.searchEntityDef(eq(new AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME,"path1@test")))).thenReturn(path1Ext); + when(atlasClient.searchEntityDef(eq(new AtlasObjectId("fileA-guid")))).thenReturn(fileAExt); + when(atlasClient.searchEntityDef(eq(new AtlasObjectId("fileB-guid")))).thenReturn(fileBExt); + + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + + assertCreateMessage(notifier, 0, fileC, fileD); + final Referenceable updatedPath1 = createRef(TYPE_NIFI_FLOW_PATH, "path1@test"); + updatedPath1.set(ATTR_INPUTS, asList(new Referenceable("fileA-guid", "fs_path", Collections.emptyMap()), fileC)); + updatedPath1.set(ATTR_OUTPUTS, asList(new Referenceable("fileB-guid", "fs_path", Collections.emptyMap()), fileD)); + assertUpdateFlowPathMessage(notifier, 1, updatedPath1); + } + +}