NIFI-6937 - NotificationSender uses typedQualifiedName instead of simple qualifiedName as keys in local maps.

NIFI-6937 - Fix NotificationSender: typedQualifiedName handling.

This closes #3929.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2019-12-12 14:46:31 +01:00 committed by Peter Turcsanyi
parent e05d11c0b4
commit 599fb98415

View File

@ -70,7 +70,7 @@ class NotificationSender {
/**
* An index to resolve a qualifiedName from a GUID.
*/
private final Map<String, String> guidToQualifiedName;
private final Map<String, String> guidToTypedQualifiedName;
/**
* An index to resolve a Referenceable from a typeName::qualifiedName.
*/
@ -87,7 +87,7 @@ class NotificationSender {
NotificationSender() {
final int qualifiedNameCacheSize = 10_000;
this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
this.guidToTypedQualifiedName = createCache(qualifiedNameCacheSize);
final int dataSetRefCacheSize = 1_000;
this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
@ -129,7 +129,7 @@ class NotificationSender {
createMessages, uniqueFlowPathCreates, uniqueOtherCreates,
partialNiFiFlowPathUpdates, uniquePartialNiFiFlowPathUpdates, otherMessages,
flowPathSearched, dataSetSearched, dataSetCacheHit,
guidToQualifiedName.size(), typedQualifiedNameToRef.size());
guidToTypedQualifiedName.size(), typedQualifiedNameToRef.size());
}
}
@ -309,7 +309,7 @@ class NotificationSender {
}
/**
* <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable keyed by qualifiedName.</p>
* <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable keyed by type + qualifiedName.</p>
* <p>Atlas removes existing references those are not specified when a collection attribute is updated.
* In order to preserve existing DataSet references, existing elements should be passed within a partial update message.</p>
* <p>This method also populates entity cache for subsequent lookups.</p>
@ -329,17 +329,18 @@ class NotificationSender {
final String typeName = (String) ref.get(ATTR_TYPENAME);
final String guid = (String) ref.get(ATTR_GUID);
if (guidToQualifiedName.containsKey(guid)) {
if (guidToTypedQualifiedName.containsKey(guid)) {
metrics.dataSetCacheHit++;
}
final String refQualifiedName = guidToQualifiedName.computeIfAbsent(guid, k -> {
final String typedQualifiedName = guidToTypedQualifiedName.computeIfAbsent(guid, k -> {
try {
metrics.dataSetSearched++;
final AtlasEntity.AtlasEntityWithExtInfo refExt = atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName));
final String qualifiedName = (String) refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME);
typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP));
return qualifiedName;
String _typedQualifiedName = toTypedQualifiedName(typeName, qualifiedName);
typedQualifiedNameToRef.put(_typedQualifiedName, new Referenceable(guid, typeName, Collections.EMPTY_MAP));
return _typedQualifiedName;
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
logger.warn("{} entity was not found for guid {}", typeName, guid);
@ -350,10 +351,10 @@ class NotificationSender {
}
});
if (refQualifiedName == null) {
if (typedQualifiedName == null) {
return null;
}
return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
return new Tuple<>(typedQualifiedName, typedQualifiedNameToRef.get(typedQualifiedName));
}).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
// If duplication happens, use new value.
.collect(toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> {
@ -381,13 +382,12 @@ class NotificationSender {
final Referenceable refFromCacheIfAvailable = typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
if (id.isAssigned()) {
// If this referenceable has Guid assigned, then add this one to cache.
guidToQualifiedName.put(id._getId(), refQualifiedName);
typedQualifiedNameToRef.put(typedRefQualifiedName, ref);
guidToTypedQualifiedName.put(id._getId(), typedRefQualifiedName);
}
return ref;
});
return new Tuple<>(refQualifiedName, refFromCacheIfAvailable);
return new Tuple<>(typedRefQualifiedName, refFromCacheIfAvailable);
}).filter(tuple -> tuple.getValue() != null)
.collect(toMap(Tuple::getKey, Tuple::getValue));
}