NIFI-8430: Close Atlas client in order to free up resources

This closes #5002

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Turcsanyi 2021-04-14 21:23:14 +02:00 committed by exceptionfactory
parent 8c66049d1d
commit 2fb05cd8dc
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 37 additions and 32 deletions

View File

@ -74,7 +74,7 @@ import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
public class NiFiAtlasClient { public class NiFiAtlasClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class);
@ -84,6 +84,11 @@ public class NiFiAtlasClient {
this.atlasClient = atlasClient; this.atlasClient = atlasClient;
} }
@Override
public void close() {
atlasClient.close();
}
/** /**
* This is an utility method to delete unused types. * This is an utility method to delete unused types.
* Should be used during development or testing only. * Should be used during development or testing only.

View File

@ -814,46 +814,46 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks. // If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks.
final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary(); final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary();
final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context); try (final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context)) {
// Create Entity defs in Atlas if there's none yet. // Create Entity defs in Atlas if there's none yet.
if (!isTypeDefCreated) { if (!isTypeDefCreated) {
try { try {
if (isResponsibleForPrimaryTasks) { if (isResponsibleForPrimaryTasks) {
// Create NiFi type definitions in Atlas type system. // Create NiFi type definitions in Atlas type system.
atlasClient.registerNiFiTypeDefs(false); atlasClient.registerNiFiTypeDefs(false);
} else { } else {
// Otherwise, just check existence of NiFi type definitions. // Otherwise, just check existence of NiFi type definitions.
if (!atlasClient.isNiFiTypeDefsRegistered()) { if (!atlasClient.isNiFiTypeDefsRegistered()) {
getLogger().debug("NiFi type definitions are not ready in Atlas type system yet."); getLogger().debug("NiFi type definitions are not ready in Atlas type system yet.");
return; return;
}
} }
isTypeDefCreated = true;
} catch (AtlasServiceException e) {
throw new RuntimeException("Failed to check and create NiFi flow type definitions in Atlas due to " + e, e);
} }
isTypeDefCreated = true;
} catch (AtlasServiceException e) {
throw new RuntimeException("Failed to check and create NiFi flow type definitions in Atlas due to " + e, e);
} }
}
// Regardless of whether being a primary task node, each node has to analyse NiFiFlow. // Regardless of whether being a primary task node, each node has to analyse NiFiFlow.
// Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism. // Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism.
final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient); final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
if (isResponsibleForPrimaryTasks) { if (isResponsibleForPrimaryTasks) {
try { try {
atlasClient.registerNiFiFlow(nifiFlow); atlasClient.registerNiFiFlow(nifiFlow);
} catch (AtlasServiceException e) { } catch (AtlasServiceException e) {
throw new RuntimeException("Failed to register NiFI flow. " + e, e); throw new RuntimeException("Failed to register NiFI flow. " + e, e);
}
} }
// NOTE: There is a race condition between the primary node and other nodes.
// If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node,
// then the notification message will fail due to having a reference to a non-existing entity.
nifiAtlasHook.setAtlasClient(atlasClient);
consumeNiFiProvenanceEvents(context, nifiFlow);
} }
// NOTE: There is a race condition between the primary node and other nodes.
// If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node,
// then the notification message will fail due to having a reference to a non-existing entity.
nifiAtlasHook.setAtlasClient(atlasClient);
consumeNiFiProvenanceEvents(context, nifiFlow);
} }
private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) { private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) {