NIFI-4971: ReportLineageToAtlas complete path can miss one-time lineages

- Separated Hook message de-duplication logic from NiFiAtlasHook to
NotificationSender
- NiFiAtlasHook used to send individual CREATE_ENTITY messages for each entities,
this commit changed it to bundle all new entities into a single
CREATE_ENTITY to preserve entity creation order, so that new DataSet
entities can be referred from new nifi_flow_path entities
- Added more unit tests

This closes #2542

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Koji Kawamura 2018-03-09 09:27:20 +09:00 committed by Mike Thomsen
parent ed9263811b
commit 79a7014a95
6 changed files with 688 additions and 81 deletions

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.hook;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.provenance.lineage.LineageContext;
import java.util.ArrayList;
import java.util.List;
/**
* This class is not thread-safe as it holds uncommitted notification messages within instance.
* {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread.
*/
public class NiFiAtlasHook extends AtlasHook implements LineageContext {
public static final String NIFI_USER = "nifi";
private static final String CONF_PREFIX = "atlas.hook.nifi.";
private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
private NiFiAtlasClient atlasClient;
public void setAtlasClient(NiFiAtlasClient atlasClient) {
this.atlasClient = atlasClient;
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
private final List<HookNotificationMessage> messages = new ArrayList<>();
@Override
public void addMessage(HookNotificationMessage message) {
messages.add(message);
}
public void commitMessages() {
final NotificationSender notificationSender = new NotificationSender();
notificationSender.setAtlasClient(atlasClient);
notificationSender.send(messages, this::notifyEntities);
}
public void close() {
if (notificationInterface != null) {
notificationInterface.close();
}
}
}

View File

@ -14,33 +14,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas;
package org.apache.nifi.atlas.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.nifi.atlas.provenance.lineage.LineageContext;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_CREATE;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
@ -49,16 +58,12 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
/**
* This class is not thread-safe as it holds uncommitted notification messages within instance.
* {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread.
* This class implements Atlas hook notification message deduplication mechanism.
* Separated from {@link NiFiAtlasHook} for better testability.
*/
public class NiFiAtlasHook extends AtlasHook implements LineageContext {
class NotificationSender {
public static final String NIFI_USER = "nifi";
private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasHook.class);
private static final String CONF_PREFIX = "atlas.hook.nifi.";
private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
private static final Logger logger = LoggerFactory.getLogger(NotificationSender.class);
private NiFiAtlasClient atlasClient;
@ -71,7 +76,6 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
*/
private final Map<String, Referenceable> typedQualifiedNameToRef;
private static <K, V> Map<K, V> createCache(final int maxSize) {
return new LinkedHashMap<K, V>(maxSize, 0.75f, true) {
@Override
@ -81,7 +85,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
};
}
public NiFiAtlasHook() {
NotificationSender() {
final int qualifiedNameCacheSize = 10_000;
this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
@ -89,70 +93,160 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
}
public void setAtlasClient(NiFiAtlasClient atlasClient) {
this.atlasClient = atlasClient;
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
private final List<HookNotificationMessage> messages = new ArrayList<>();
@Override
public void addMessage(HookNotificationMessage message) {
messages.add(message);
}
private class Metrics {
final long startedAt = System.currentTimeMillis();
/**
* The total number of messages passed to commitMessages.
*/
int totalMessages;
/**
* The number of CreateEntityRequest messages before de-duplication.
*/
int createMessages;
/**
* The number of unique CreateEntityRequest messages for 'nifi_flow_path'.
*/
int uniqueFlowPathCreates;
/**
* The number of unique CreateEntityRequest messages except 'nifi_flow_path'.
*/
int uniqueOtherCreates;
int partialNiFiFlowPathUpdates;
int dedupedPartialNiFiFlowPathUpdates;
int uniquePartialNiFiFlowPathUpdates;
int otherMessages;
int flowPathSearched;
int dataSetSearched;
int dataSetCacheHit;
private void log(String message) {
logger.debug(String.format("%s, %d ms passed, totalMessages=%d," +
" partialNiFiFlowPathUpdates=%d, dedupedPartialNiFiFlowPathUpdates=%d, otherMessage=%d," +
" flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s," +
" guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d",
private String toLogString(String message) {
return String.format("%s, %d ms passed, totalMessages=%d," +
" createMessages=%d, uniqueFlowPathCreates=%d, uniqueOtherCreates=%d," +
" partialNiFiFlowPathUpdates=%d, uniquePartialNiFiFlowPathUpdates=%d, otherMessage=%d," +
" flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s," +
" guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d",
message, System.currentTimeMillis() - startedAt, totalMessages,
partialNiFiFlowPathUpdates, dedupedPartialNiFiFlowPathUpdates, otherMessages,
createMessages, uniqueFlowPathCreates, uniqueOtherCreates,
partialNiFiFlowPathUpdates, uniquePartialNiFiFlowPathUpdates, otherMessages,
flowPathSearched, dataSetSearched, dataSetCacheHit,
guidToQualifiedName.size(), typedQualifiedNameToRef.size()));
guidToQualifiedName.size(), typedQualifiedNameToRef.size());
}
}
public void commitMessages() {
final Map<Boolean, List<HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers
= messages.stream().collect(Collectors.groupingBy(msg
-> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
&& TYPE_NIFI_FLOW_PATH.equals(((EntityPartialUpdateRequest)msg).getTypeName())
&& ATTR_QUALIFIED_NAME.equals(((EntityPartialUpdateRequest)msg).getAttribute())
));
void setAtlasClient(NiFiAtlasClient atlasClient) {
this.atlasClient = atlasClient;
}
private Predicate<Referenceable> distinctReferenceable() {
final Set<String> keys = new HashSet<>();
return r -> {
final String key = AtlasUtils.toTypedQualifiedName(r.getTypeName(), (String) r.get(ATTR_QUALIFIED_NAME));
return keys.add(key);
};
}
final List<HookNotificationMessage> otherMessages = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(false, k -> Collections.emptyList());
final List<HookNotificationMessage> partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(true, k -> Collections.emptyList());
logger.info("Commit messages: {} partialNiFiFlowPathUpdate and {} other messages.", partialNiFiFlowPathUpdates.size(), otherMessages.size());
private <K, V> List<V> safeGet(Map<K, List<V>> map, K key) {
return map.computeIfAbsent(key, k -> Collections.emptyList());
}
@SuppressWarnings("unchecked")
private void mergeRefs(Referenceable r1, Referenceable r2) {
r1.set(ATTR_INPUTS, mergeRefs((Collection<Referenceable>) r1.get(ATTR_INPUTS), (Collection<Referenceable>) r2.get(ATTR_INPUTS)));
r1.set(ATTR_OUTPUTS, mergeRefs((Collection<Referenceable>) r1.get(ATTR_OUTPUTS), (Collection<Referenceable>) r2.get(ATTR_OUTPUTS)));
}
private Collection<Referenceable> mergeRefs(Collection<Referenceable> r1, Collection<Referenceable> r2) {
final boolean isR1Empty = r1 == null || r1.isEmpty();
final boolean isR2Empty = r2 == null || r2.isEmpty();
if (isR1Empty) {
// r2 may or may not have entities, don't have to merge r1.
return r2;
} else if (isR2Empty) {
// r1 has some entities, don't have to merge r2.
return r1;
}
// If both have entities, then need to be merged.
return Stream.concat(r1.stream(), r2.stream()).filter(distinctReferenceable()).collect(Collectors.toList());
}
/**
* <p>Send hook notification messages.
* In order to notify relationships between 'nifi_flow_path' and its inputs/outputs, this method sends messages in following order:</p>
* <ol>
* <li>As a a single {@link org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest} message:
* <ul>
* <li>New entities except 'nifi_flow_path', including DataSets such as 'nifi_queue', 'kafka_topic' or 'hive_table' ... etc,
* so that 'nifi_flow_path' can refer</li>
* <li>New 'nifi_flow_path' entities, entities order is guaranteed in a single message</li>
* </ul>
* </li>
* <li>Update 'nifi_flow_path' messages, before notifying update messages, this method fetches existing 'nifi_flow_path' entity
* to merge new inputs/outputs element with existing ones, so that existing ones will not be removed.</li>
* <li>Other messages except</li>
* </ol>
* <p>Messages having the same type and qualified name will be de-duplicated before being sent.</p>
* @param messages list of messages to be sent
* @param notifier responsible for sending notification messages, its accept method can be called multiple times
*/
void send(final List<HookNotification.HookNotificationMessage> messages, final Consumer<List<HookNotification.HookNotificationMessage>> notifier) {
final Metrics metrics = new Metrics();
metrics.totalMessages = messages.size();
metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
metrics.otherMessages = otherMessages.size();
try {
// Notify other messages first.
notifyEntities(otherMessages);
metrics.totalMessages = messages.size();
// De-duplicate messages.
final List<HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (EntityPartialUpdateRequest) msg)
final Map<Boolean, List<HookNotification.HookNotificationMessage>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType())));
final List<HookNotification.HookNotificationMessage> creates = safeGet(createAndOthers, true);
metrics.createMessages = creates.size();
final Map<Boolean, List<Referenceable>> newFlowPathsAndOtherEntities = creates.stream()
.flatMap(msg -> ((HookNotification.EntityCreateRequest) msg).getEntities().stream())
.collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.typeName)));
// Deduplicate same entity creation messages.
final List<Referenceable> newEntitiesExceptFlowPaths = safeGet(newFlowPathsAndOtherEntities, false)
.stream().filter(distinctReferenceable()).collect(Collectors.toList());
// Deduplicate same flow paths and also merge inputs and outputs
final Collection<Referenceable> newFlowPaths = safeGet(newFlowPathsAndOtherEntities, true).stream()
.collect(toMap(ref -> ref.get(ATTR_QUALIFIED_NAME), ref -> ref, (r1, r2) -> {
// Merge inputs and outputs.
mergeRefs(r1, r2);
return r1;
})).values();
metrics.uniqueFlowPathCreates = newFlowPaths.size();
metrics.uniqueOtherCreates = newEntitiesExceptFlowPaths.size();
// 1-1. Notify new entities except 'nifi_flow_path'
// 1-2. Notify new 'nifi_flow_path'
List<Referenceable> newEntities = new ArrayList<>();
newEntities.addAll(newEntitiesExceptFlowPaths);
newEntities.addAll(newFlowPaths);
if (!newEntities.isEmpty()) {
notifier.accept(Collections.singletonList(new HookNotification.EntityCreateRequest(NIFI_USER, newEntities)));
}
final Map<Boolean, List<HookNotification.HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers
= safeGet(createAndOthers, false).stream().collect(groupingBy(msg
-> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
&& TYPE_NIFI_FLOW_PATH.equals(((HookNotification.EntityPartialUpdateRequest)msg).getTypeName())
&& ATTR_QUALIFIED_NAME.equals(((HookNotification.EntityPartialUpdateRequest)msg).getAttribute())
));
// These updates are made against existing flow path entities.
final List<HookNotification.HookNotificationMessage> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
final List<HookNotification.HookNotificationMessage> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false);
metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
metrics.otherMessages = otherMessages.size();
// 2. Notify de-duplicated 'nifi_flow_path' updates
final List<HookNotification.HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotification.EntityPartialUpdateRequest) msg)
// Group by nifi_flow_path qualifiedName value.
.collect(Collectors.groupingBy(EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
.collect(groupingBy(HookNotification.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
.map(entry -> {
final String flowPathQualifiedName = entry.getKey();
final Map<String, Referenceable> distinctInputs;
@ -178,7 +272,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
}
// Merge all inputs and outputs for this nifi_flow_path.
for (EntityPartialUpdateRequest msg : entry.getValue()) {
for (HookNotification.EntityPartialUpdateRequest msg : entry.getValue()) {
fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics)
.entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey()))
.forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue()));
@ -194,18 +288,22 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
// org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable
flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values()));
flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values()));
return new EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
return new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
metrics.dedupedPartialNiFiFlowPathUpdates = deduplicatedMessages.size();
notifyEntities(deduplicatedMessages);
metrics.uniquePartialNiFiFlowPathUpdates = deduplicatedMessages.size();
notifier.accept(deduplicatedMessages);
// 3. Notify other messages
notifier.accept(otherMessages);
} finally {
metrics.log("Committed");
messages.clear();
logger.info(metrics.toLogString("Finished"));
}
}
/**
@ -223,7 +321,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
return new HashMap<>();
}
final List<Map<String, Object>> refs = (List<Map<String, Object>>) _refs;
final Collection<Map<String, Object>> refs = (Collection<Map<String, Object>>) _refs;
return refs.stream().map(ref -> {
// Existing reference should has a GUID.
final String typeName = (String) ref.get(ATTR_TYPENAME);
@ -256,8 +354,8 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
}).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
// If duplication happens, use new value.
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> {
logger.warn("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue});
.collect(toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> {
logger.debug("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue});
return newValue;
}));
}
@ -268,7 +366,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
return Collections.emptyMap();
}
final List<Referenceable> refs = (List<Referenceable>) _refs;
final Collection<Referenceable> refs = (Collection<Referenceable>) _refs;
return refs.stream().map(ref -> {
// This ref is created within this reporting cycle, and it may not have GUID assigned yet, if it is a brand new reference.
// If cache has the Reference, then use it because instances in the cache are guaranteed to have GUID assigned.
@ -288,13 +386,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
});
return new Tuple<>(refQualifiedName, refFromCacheIfAvailable);
}).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
}
public void close() {
if (notificationInterface != null) {
notificationInterface.close();
}
}).filter(tuple -> tuple.getValue() != null)
.collect(toMap(Tuple::getKey, Tuple::getValue));
}
}

View File

@ -40,7 +40,7 @@ import java.util.stream.Collectors;
import static org.apache.nifi.atlas.AtlasUtils.toStr;
import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
import static org.apache.nifi.atlas.NiFiAtlasHook.NIFI_USER;
import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW;
@ -136,7 +136,7 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
}
@SuppressWarnings("unchecked")
private boolean addDataSetRefs(Set<Referenceable> refsToAdd, Referenceable nifiFlowPath, String targetAttribute) {
protected boolean addDataSetRefs(Set<Referenceable> refsToAdd, Referenceable nifiFlowPath, String targetAttribute) {
if (refsToAdd != null && !refsToAdd.isEmpty()) {
// If nifiFlowPath already has a given dataSetRef, then it needs not to be created.

View File

@ -42,7 +42,9 @@ import java.util.zip.CRC32;
import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
import static org.apache.nifi.atlas.AtlasUtils.toStr;
import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
import static org.apache.nifi.provenance.ProvenanceEventType.DROP;
@ -76,8 +78,36 @@ public class CompleteFlowPathLineage extends AbstractLineageStrategy {
createCompleteFlowPath(nifiFlow, lineagePath, createdFlowPaths);
for (Tuple<NiFiFlowPath, DataSetRefs> createdFlowPath : createdFlowPaths) {
final NiFiFlowPath flowPath = createdFlowPath.getKey();
createEntity(toReferenceable(flowPath, nifiFlow));
// NOTE 1: FlowPath creation and DataSet references should be reported separately
// ------------------------------------------------------------------------------
// For example, with following provenance event inputs:
// CREATE(F1), FORK (F1 -> F2, F3), DROP(F1), SEND (F2), SEND(F3), DROP(F2), DROP(F3),
// there is no guarantee that DROP(F2) and DROP(F3) are processed within the same cycle.
// If DROP(F3) is processed in different cycle, it needs to be added to the existing FlowPath
// that contains F1 -> F2, to be F1 -> F2, F3.
// Execution cycle 1: Path1 (source of F1 -> ForkA), ForkA_queue (F1 -> F2), Path2 (ForkA -> dest of F2)
// Execution cycle 2: Path1 (source of F1 -> ForkB), ForkB_queue (F1 -> F3), Path3 (ForkB -> dest of F3)
// NOTE 2: Both FlowPath creation and FlowPath update messages are required
// ------------------------------------------------------------------------
// For the 1st time when a lineage is found, nifi_flow_path and referred DataSets are created.
// If we notify these entities by a create 3 entities message (Path1, DataSet1, DataSet2)
// followed by 1 partial update message to add lineage (DataSet1 -> Path1 -> DataSet2), then
// the update message may arrive at Atlas earlier than the create message gets processed.
// If that happens, lineage among these entities will be missed.
// But as written in NOTE1, there is a case where existing nifi_flow_paths need to be updated.
// Also, we don't know if this is the 1st time or 2nd or later.
// So, we need to notify entity creation and also partial update messages.
// Create flow path entity with DataSet refs.
final Referenceable flowPathRef = toReferenceable(flowPath, nifiFlow);
final DataSetRefs dataSetRefs = createdFlowPath.getValue();
addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS);
addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS);
createEntity(flowPathRef);
// Also, sending partial update message to update existing flow_path.
addDataSetRefs(nifiFlow, Collections.singleton(flowPath), createdFlowPath.getValue());
}
createdFlowPaths.clear();
}

View File

@ -56,8 +56,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.NiFiAtlasHook;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;

View File

@ -0,0 +1,417 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.hook;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_DATASET;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestNotificationSender {
private static final Logger logger = LoggerFactory.getLogger(TestNotificationSender.class);
private static class Notifier implements Consumer<List<HookNotification.HookNotificationMessage>> {
private final List<List<HookNotification.HookNotificationMessage>> notifications = new ArrayList<>();
@Override
public void accept(List<HookNotification.HookNotificationMessage> messages) {
logger.info("notified at {}, {}", notifications.size(), messages);
notifications.add(messages);
}
}
@Test
public void testZeroMessage() {
final NotificationSender sender = new NotificationSender();
final List<HookNotification.HookNotificationMessage> messages = Collections.emptyList();
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
assertEquals(0, notifier.notifications.get(0).size());
assertEquals(0, notifier.notifications.get(1).size());
}
private Referenceable createRef(String type, String qname) {
final Referenceable ref = new Referenceable(type);
ref.set(ATTR_QUALIFIED_NAME, qname);
return ref;
}
@SuppressWarnings("unchecked")
private void assertCreateMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
assertTrue(notifier.notifications.size() > notificationIndex);
final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
assertEquals(1, messages.size());
final HookNotification.EntityCreateRequest message = (HookNotification.EntityCreateRequest) messages.get(0);
assertEquals(expects.length, message.getEntities().size());
// The use of 'flatMap' at NotificationSender does not preserve actual entities order.
// Use typed qname map to assert regardless of ordering.
final Map<String, Referenceable> entities = message.getEntities().stream().collect(Collectors.toMap(
ref -> AtlasUtils.toTypedQualifiedName(ref.getTypeName(), (String) ref.get(ATTR_QUALIFIED_NAME)), ref -> ref));
boolean hasFlowPathSeen = false;
for (int i = 0; i < expects.length; i++) {
final Referenceable expect = expects[i];
final String typeName = expect.getTypeName();
final Referenceable actual = entities.get(AtlasUtils.toTypedQualifiedName(typeName, (String) expect.get(ATTR_QUALIFIED_NAME)));
assertNotNull(actual);
assertEquals(typeName, actual.getTypeName());
assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.get(ATTR_QUALIFIED_NAME));
if (TYPE_NIFI_FLOW_PATH.equals(typeName)) {
assertIOReferences(expect, actual, ATTR_INPUTS);
assertIOReferences(expect, actual, ATTR_OUTPUTS);
hasFlowPathSeen = true;
} else {
assertFalse("Types other than nifi_flow_path should be created before any nifi_flow_path entity.", hasFlowPathSeen);
}
}
}
@SuppressWarnings("unchecked")
private void assertIOReferences(Referenceable expect, Referenceable actual, String attrName) {
final Collection<Referenceable> expectedRefs = (Collection<Referenceable>) expect.get(attrName);
if (expectedRefs != null) {
final Collection<Referenceable> actualRefs = (Collection<Referenceable>) actual.get(attrName);
assertEquals(expectedRefs.size(), actualRefs.size());
final Iterator<Referenceable> actualIterator = actualRefs.iterator();
for (Referenceable expectedRef : expectedRefs) {
final Referenceable actualRef = actualIterator.next();
assertEquals(expectedRef.getTypeName(), actualRef.getTypeName());
assertEquals(expectedRef.get(ATTR_QUALIFIED_NAME), actualRef.get(ATTR_QUALIFIED_NAME));
}
}
}
@SuppressWarnings("unchecked")
private void assertUpdateFlowPathMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
assertTrue(notifier.notifications.size() > notificationIndex);
final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
assertEquals(expects.length, messages.size());
for (int i = 0; i < expects.length; i++) {
final Referenceable expect = expects[i];
final HookNotification.EntityPartialUpdateRequest actual = (HookNotification.EntityPartialUpdateRequest) messages.get(i);
assertEquals(expect.getTypeName(), actual.getTypeName());
assertEquals(ATTR_QUALIFIED_NAME, actual.getAttribute());
assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.getAttributeValue());
final Collection expIn = (Collection) expect.get(ATTR_INPUTS);
final Collection expOut = (Collection) expect.get(ATTR_OUTPUTS);
assertTrue(expIn.containsAll((Collection) actual.getEntity().get(ATTR_INPUTS)));
assertTrue(expOut.containsAll((Collection) actual.getEntity().get(ATTR_OUTPUTS)));
}
}
@Test
public void testOneCreateDataSetMessage() {
final NotificationSender sender = new NotificationSender();
final Referenceable queue1 = createRef(TYPE_NIFI_QUEUE, "queue1@test");
final List<HookNotification.HookNotificationMessage> messages = Collections.singletonList(
new HookNotification.EntityCreateRequest(NIFI_USER, queue1));
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
// EntityCreateRequest containing nifi_queue.
assertCreateMessage(notifier, 0, queue1);
}
@Test
public void testCreateDataSetMessageDeduplication() {
// Simulating complete path, that there were 4 FlowFiles went through the same partial flow.
// FF1 was ingested from data1, and so was FF2 from data2.
// Then FF1 was FORKed to FF11 and FF12, then sent to data11 and data12.
// Similarly FF2 was FORKed to FF21 and FF22, then sent to data21 and data22.
// FF3 went through the same as FF1, so did FF4 as FF2.
// All of those provenance events were processed within a single ReportLineageToAtlas cycle.
// FF1: data1 -> pathA1 -> FORK11 (FF11) -> pathB11 -> data11
// -> FORK12 (FF12) -> pathB12 -> data12
// FF2: data2 -> pathA2 -> FORK21 (FF21) -> pathB21 -> data21
// -> FORK22 (FF22) -> pathB22 -> data22
// FF3: data1 -> pathA1 -> FORK11 (FF31) -> pathB11 -> data11
// -> FORK12 (FF32) -> pathB12 -> data12
// FF4: data2 -> pathA2 -> FORK21 (FF41) -> pathB21 -> data21
// -> FORK22 (FF42) -> pathB22 -> data22
// As a result, following lineages are reported to Atlas:
// data1 -> pathA1 -> FORK11 -> pathB11 -> data11
// -> FORK12 -> pathB12 -> data12
// data2 -> pathA2 -> FORK21 -> pathB21 -> data21
// -> FORK22 -> pathB22 -> data22
final NotificationSender sender = new NotificationSender();
// From FF1
final Referenceable ff1_data1 = createRef(TYPE_DATASET, "data1@test");
final Referenceable ff1_data11 = createRef(TYPE_DATASET, "data11@test");
final Referenceable ff1_data12 = createRef(TYPE_DATASET, "data12@test");
final Referenceable ff1_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test");
final Referenceable ff1_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test");
final Referenceable ff1_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test");
final Referenceable ff1_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test");
final Referenceable ff1_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test");
final Referenceable ff1_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test");
// From FF11
ff1_pathA11.set(ATTR_INPUTS, singleton(ff1_data1));
ff1_pathA11.set(ATTR_OUTPUTS, singleton(ff1_fork11));
ff1_pathB11.set(ATTR_INPUTS, singleton(ff1_fork11));
ff1_pathB11.set(ATTR_OUTPUTS, singleton(ff1_data11));
// From FF12
ff1_pathA12.set(ATTR_INPUTS, singleton(ff1_data1));
ff1_pathA12.set(ATTR_OUTPUTS, singleton(ff1_fork12));
ff1_pathB12.set(ATTR_INPUTS, singleton(ff1_fork12));
ff1_pathB12.set(ATTR_OUTPUTS, singleton(ff1_data12));
// From FF2
final Referenceable ff2_data2 = createRef(TYPE_DATASET, "data2@test");
final Referenceable ff2_data21 = createRef(TYPE_DATASET, "data21@test");
final Referenceable ff2_data22 = createRef(TYPE_DATASET, "data22@test");
final Referenceable ff2_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test");
final Referenceable ff2_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test");
final Referenceable ff2_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test");
final Referenceable ff2_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test");
final Referenceable ff2_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test");
final Referenceable ff2_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test");
// From FF21
ff2_pathA21.set(ATTR_INPUTS, singleton(ff2_data2));
ff2_pathA21.set(ATTR_OUTPUTS, singleton(ff2_fork21));
ff2_pathB21.set(ATTR_INPUTS, singleton(ff2_fork21));
ff2_pathB21.set(ATTR_OUTPUTS, singleton(ff2_data21));
// From FF22
ff2_pathA22.set(ATTR_INPUTS, singleton(ff2_data2));
ff2_pathA22.set(ATTR_OUTPUTS, singleton(ff2_fork22));
ff2_pathB22.set(ATTR_INPUTS, singleton(ff2_fork22));
ff2_pathB22.set(ATTR_OUTPUTS, singleton(ff2_data22));
// From FF3
final Referenceable ff3_data1 = createRef(TYPE_DATASET, "data1@test");
final Referenceable ff3_data11 = createRef(TYPE_DATASET, "data11@test");
final Referenceable ff3_data12 = createRef(TYPE_DATASET, "data12@test");
final Referenceable ff3_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test");
final Referenceable ff3_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test");
final Referenceable ff3_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test");
final Referenceable ff3_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test");
final Referenceable ff3_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test");
final Referenceable ff3_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test");
// From FF31
ff3_pathA11.set(ATTR_INPUTS, singleton(ff3_data1));
ff3_pathA11.set(ATTR_OUTPUTS, singleton(ff3_fork11));
ff3_pathB11.set(ATTR_INPUTS, singleton(ff3_fork11));
ff3_pathB11.set(ATTR_OUTPUTS, singleton(ff3_data11));
// From FF32
ff3_pathA12.set(ATTR_INPUTS, singleton(ff3_data1));
ff3_pathA12.set(ATTR_OUTPUTS, singleton(ff3_fork12));
ff3_pathB12.set(ATTR_INPUTS, singleton(ff3_fork12));
ff3_pathB12.set(ATTR_OUTPUTS, singleton(ff3_data12));
// From FF4
final Referenceable ff4_data2 = createRef(TYPE_DATASET, "data2@test");
final Referenceable ff4_data21 = createRef(TYPE_DATASET, "data21@test");
final Referenceable ff4_data22 = createRef(TYPE_DATASET, "data22@test");
final Referenceable ff4_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test");
final Referenceable ff4_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test");
final Referenceable ff4_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test");
final Referenceable ff4_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test");
final Referenceable ff4_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test");
final Referenceable ff4_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test");
// From FF41
ff4_pathA21.set(ATTR_INPUTS, singleton(ff4_data2));
ff4_pathA21.set(ATTR_OUTPUTS, singleton(ff4_fork21));
ff4_pathB21.set(ATTR_INPUTS, singleton(ff4_fork21));
ff4_pathB21.set(ATTR_OUTPUTS, singleton(ff4_data21));
// From FF42
ff4_pathA22.set(ATTR_INPUTS, singleton(ff4_data2));
ff4_pathA22.set(ATTR_OUTPUTS, singleton(ff4_fork22));
ff4_pathB22.set(ATTR_INPUTS, singleton(ff4_fork22));
ff4_pathB22.set(ATTR_OUTPUTS, singleton(ff4_data22));
final List<HookNotification.HookNotificationMessage> messages = asList(
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data1),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data2),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data1),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data2),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB22)
);
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
// EntityCreateRequest, same entities get de-duplicated. nifi_flow_path is created after other types.
final Referenceable r_data1 = createRef(TYPE_DATASET, "data1@test");
final Referenceable r_data11 = createRef(TYPE_DATASET, "data11@test");
final Referenceable r_data12 = createRef(TYPE_DATASET, "data12@test");
final Referenceable r_pathA1 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test");
final Referenceable r_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test");
final Referenceable r_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test");
final Referenceable r_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test");
final Referenceable r_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test");
r_pathA1.set(ATTR_INPUTS, singleton(r_data1));
r_pathA1.set(ATTR_OUTPUTS, asList(r_fork11, r_fork12));
r_pathB11.set(ATTR_INPUTS, singleton(r_fork11));
r_pathB11.set(ATTR_OUTPUTS, singleton(r_data11));
r_pathB12.set(ATTR_INPUTS, singleton(r_fork12));
r_pathB12.set(ATTR_OUTPUTS, singleton(r_data12));
final Referenceable r_data2 = createRef(TYPE_DATASET, "data2@test");
final Referenceable r_data21 = createRef(TYPE_DATASET, "data21@test");
final Referenceable r_data22 = createRef(TYPE_DATASET, "data22@test");
final Referenceable r_pathA2 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test");
final Referenceable r_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test");
final Referenceable r_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test");
final Referenceable r_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test");
final Referenceable r_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test");
r_pathA2.set(ATTR_INPUTS, singleton(r_data2));
r_pathA2.set(ATTR_OUTPUTS, asList(r_fork21, r_fork22));
r_pathB21.set(ATTR_INPUTS, singleton(r_fork21));
r_pathB21.set(ATTR_OUTPUTS, singleton(r_data21));
r_pathB22.set(ATTR_INPUTS, singleton(r_fork22));
r_pathB22.set(ATTR_OUTPUTS, singleton(r_data22));
assertCreateMessage(notifier, 0,
r_data1, r_data11, r_data12, r_fork11, r_fork12,
r_data2, r_data21, r_data22, r_fork21, r_fork22,
r_pathA1, r_pathB11, r_pathB12,
r_pathA2, r_pathB21, r_pathB22);
}
private Map<String, String> createGuidReference(String type, String guid) {
Map<String, String> map = new HashMap<>();
map.put(ATTR_TYPENAME, type);
map.put(ATTR_GUID, guid);
return map;
}
@Test
public void testUpdateFlowPath() throws AtlasServiceException {
final NotificationSender sender = new NotificationSender();
final Referenceable fileC = createRef("fs_path", "/tmp/file-c.txt@test");
final Referenceable fileD = createRef("fs_path", "/tmp/file-d.txt@test");
// New in/out fileC and fileD are found for path1.
final Referenceable newPath1Lineage = createRef(TYPE_NIFI_FLOW_PATH, "path1@test");
newPath1Lineage.set(ATTR_INPUTS, singleton(fileC));
newPath1Lineage.set(ATTR_OUTPUTS, singleton(fileD));
final List<HookNotification.HookNotificationMessage> messages = asList(
new HookNotification.EntityCreateRequest(NIFI_USER, fileC),
new HookNotification.EntityCreateRequest(NIFI_USER, fileD),
new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
);
final NiFiAtlasClient atlasClient = mock(NiFiAtlasClient.class);
sender.setAtlasClient(atlasClient);
// Existing nifi_flow_path
final AtlasEntity path1Entity = new AtlasEntity(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test");
path1Entity.setGuid("path1-guid");
path1Entity.setAttribute(ATTR_INPUTS, singleton(createGuidReference("fs_path", "fileA-guid")));
path1Entity.setAttribute(ATTR_OUTPUTS, singleton(createGuidReference("fs_path", "fileB-guid")));
final AtlasEntity fileAEntity = new AtlasEntity("fs_path", ATTR_QUALIFIED_NAME, "file-a.txt@test");
fileAEntity.setGuid("fileA-guid");
final AtlasEntity fileBEntity = new AtlasEntity("fs_path", ATTR_QUALIFIED_NAME, "file-b.txt@test");
fileBEntity.setGuid("fileA-guid");
final AtlasEntity.AtlasEntityWithExtInfo path1Ext = new AtlasEntity.AtlasEntityWithExtInfo(path1Entity);
final AtlasEntity.AtlasEntityWithExtInfo fileAExt = new AtlasEntity.AtlasEntityWithExtInfo(fileAEntity);
final AtlasEntity.AtlasEntityWithExtInfo fileBExt = new AtlasEntity.AtlasEntityWithExtInfo(fileBEntity);
when(atlasClient.searchEntityDef(eq(new AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME,"path1@test")))).thenReturn(path1Ext);
when(atlasClient.searchEntityDef(eq(new AtlasObjectId("fileA-guid")))).thenReturn(fileAExt);
when(atlasClient.searchEntityDef(eq(new AtlasObjectId("fileB-guid")))).thenReturn(fileBExt);
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
assertCreateMessage(notifier, 0, fileC, fileD);
final Referenceable updatedPath1 = createRef(TYPE_NIFI_FLOW_PATH, "path1@test");
updatedPath1.set(ATTR_INPUTS, asList(new Referenceable("fileA-guid", "fs_path", Collections.emptyMap()), fileC));
updatedPath1.set(ATTR_OUTPUTS, asList(new Referenceable("fileB-guid", "fs_path", Collections.emptyMap()), fileD));
assertUpdateFlowPathMessage(notifier, 1, updatedPath1);
}
}