NIFI-7009: Atlas reporting task retrieves only the active flow components

Filter out the deleted components before querying them, instead of retrieving
all the components before filtering.

This closes #3979
This commit is contained in:
Peter Turcsanyi 2020-01-04 22:48:44 +01:00 committed by Matt Gilman
parent 91e9e65a5c
commit ce050f4ecb
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
1 changed files with 34 additions and 4 deletions

View File

@ -193,6 +193,7 @@ public class NiFiAtlasClient {
} }
final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity(); final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity();
final Map<String, AtlasEntity> nifiFlowReferredEntities = nifiFlowExt.getReferredEntities();
final Map<String, Object> attributes = nifiFlowEntity.getAttributes(); final Map<String, Object> attributes = nifiFlowEntity.getAttributes();
final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId); final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
nifiFlow.setExEntity(nifiFlowEntity); nifiFlow.setExEntity(nifiFlowEntity);
@ -201,12 +202,12 @@ public class NiFiAtlasClient {
nifiFlow.setUrl(toStr(attributes.get(ATTR_URL))); nifiFlow.setUrl(toStr(attributes.get(ATTR_URL)));
nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION))); nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION)));
nifiFlow.getQueues().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_QUEUES)))); nifiFlow.getQueues().putAll(fetchFlowComponents(TYPE_NIFI_QUEUE, nifiFlowReferredEntities));
nifiFlow.getRootInputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_INPUT_PORTS)))); nifiFlow.getRootInputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_INPUT_PORT, nifiFlowReferredEntities));
nifiFlow.getRootOutputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_OUTPUT_PORTS)))); nifiFlow.getRootOutputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_OUTPUT_PORT, nifiFlowReferredEntities));
final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths(); final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths();
final Map<AtlasObjectId, AtlasEntity> flowPathEntities = toQualifiedNameIds(toAtlasObjectIds(attributes.get(ATTR_FLOW_PATHS))); final Map<AtlasObjectId, AtlasEntity> flowPathEntities = fetchFlowComponents(TYPE_NIFI_FLOW_PATH, nifiFlowReferredEntities);
for (AtlasEntity flowPathEntity : flowPathEntities.values()) { for (AtlasEntity flowPathEntity : flowPathEntities.values()) {
final String pathQualifiedName = toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME)); final String pathQualifiedName = toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME));
@ -230,6 +231,35 @@ public class NiFiAtlasClient {
return nifiFlow; return nifiFlow;
} }
/**
* Retrieves the flow components of type {@code componentType} from Atlas server.
* Deleted components will be filtered out before calling Atlas.
* Atlas object ids will be initialized with all the attributes (guid, type, unique attributes) in order to be able
* to match ids retrieved from Atlas (having guid) and ids created by the reporting task (not having guid yet).
*
* @param componentType Atlas type of the flow component (nifi_flow_path, nifi_queue, nifi_input_port, nifi_output_port)
* @param referredEntities referred entities of the flow entity (returned when the flow fetched) containing the basic data (id, status) of the flow components
* @return flow component entities mapped to their object ids
*/
private Map<AtlasObjectId, AtlasEntity> fetchFlowComponents(String componentType, Map<String, AtlasEntity> referredEntities) {
return referredEntities.values().stream()
.filter(referredEntity -> referredEntity.getTypeName().equals(componentType))
.filter(referredEntity -> referredEntity.getStatus() == AtlasEntity.Status.ACTIVE)
.map(referredEntity -> {
final Map<String, Object> uniqueAttributes = Collections.singletonMap(ATTR_QUALIFIED_NAME, referredEntity.getAttribute(ATTR_QUALIFIED_NAME));
final AtlasObjectId id = new AtlasObjectId(referredEntity.getGuid(), componentType, uniqueAttributes);
try {
final AtlasEntity.AtlasEntityWithExtInfo fetchedEntityExt = searchEntityDef(id);
return new Tuple<>(id, fetchedEntityExt.getEntity());
} catch (AtlasServiceException e) {
logger.warn("Failed to search entity by id {}, due to {}", id, e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private List<AtlasObjectId> toAtlasObjectIds(Object _references) { private List<AtlasObjectId> toAtlasObjectIds(Object _references) {
if (_references == null) { if (_references == null) {