From 2fb05cd8dc6b71d1808cc7447ce085a8e7ab8236 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Wed, 14 Apr 2021 21:23:14 +0200 Subject: [PATCH] NIFI-8430: Close Atlas client in order to free up resources This closes #5002 Signed-off-by: David Handermann --- .../apache/nifi/atlas/NiFiAtlasClient.java | 7 ++- .../atlas/reporting/ReportLineageToAtlas.java | 62 +++++++++---------- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java index 3436ff346d..e5c289a292 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java @@ -74,7 +74,7 @@ import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; -public class NiFiAtlasClient { +public class NiFiAtlasClient implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); @@ -84,6 +84,11 @@ public class NiFiAtlasClient { this.atlasClient = atlasClient; } + @Override + public void close() { + atlasClient.close(); + } + /** * This is an utility method to delete unused types. * Should be used during development or testing only. diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 9afacf539f..ca7ca366a7 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -814,46 +814,46 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks. final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary(); - final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context); + try (final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context)) { - // Create Entity defs in Atlas if there's none yet. - if (!isTypeDefCreated) { - try { - if (isResponsibleForPrimaryTasks) { - // Create NiFi type definitions in Atlas type system. - atlasClient.registerNiFiTypeDefs(false); - } else { - // Otherwise, just check existence of NiFi type definitions. - if (!atlasClient.isNiFiTypeDefsRegistered()) { - getLogger().debug("NiFi type definitions are not ready in Atlas type system yet."); - return; + // Create Entity defs in Atlas if there's none yet. + if (!isTypeDefCreated) { + try { + if (isResponsibleForPrimaryTasks) { + // Create NiFi type definitions in Atlas type system. + atlasClient.registerNiFiTypeDefs(false); + } else { + // Otherwise, just check existence of NiFi type definitions. + if (!atlasClient.isNiFiTypeDefsRegistered()) { + getLogger().debug("NiFi type definitions are not ready in Atlas type system yet."); + return; + } } + isTypeDefCreated = true; + } catch (AtlasServiceException e) { + throw new RuntimeException("Failed to check and create NiFi flow type definitions in Atlas due to " + e, e); } - isTypeDefCreated = true; - } catch (AtlasServiceException e) { - throw new RuntimeException("Failed to check and create NiFi flow type definitions in Atlas due to " + e, e); } - } - // Regardless of whether being a primary task node, each node has to analyse NiFiFlow. - // Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism. - final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient); + // Regardless of whether being a primary task node, each node has to analyse NiFiFlow. + // Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism. + final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient); - if (isResponsibleForPrimaryTasks) { - try { - atlasClient.registerNiFiFlow(nifiFlow); - } catch (AtlasServiceException e) { - throw new RuntimeException("Failed to register NiFI flow. " + e, e); + if (isResponsibleForPrimaryTasks) { + try { + atlasClient.registerNiFiFlow(nifiFlow); + } catch (AtlasServiceException e) { + throw new RuntimeException("Failed to register NiFI flow. " + e, e); + } } + + // NOTE: There is a race condition between the primary node and other nodes. + // If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node, + // then the notification message will fail due to having a reference to a non-existing entity. + nifiAtlasHook.setAtlasClient(atlasClient); + consumeNiFiProvenanceEvents(context, nifiFlow); } - - // NOTE: There is a race condition between the primary node and other nodes. - // If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node, - // then the notification message will fail due to having a reference to a non-existing entity. - nifiAtlasHook.setAtlasClient(atlasClient); - consumeNiFiProvenanceEvents(context, nifiFlow); - } private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) {