diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java index feb2b48ccd..4e95a92a8f 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java @@ -18,7 +18,6 @@ package org.apache.nifi.atlas; import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.core.util.MultivaluedMapImpl; -import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasServiceException; @@ -29,14 +28,12 @@ import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.nifi.atlas.security.AtlasAuthN; import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.MultivaluedMap; -import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -44,7 +41,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -82,44 +78,10 @@ public class NiFiAtlasClient { private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); - private static NiFiAtlasClient nifiClient; - private AtlasClientV2 atlasClient; + private final AtlasClientV2 atlasClient; - private NiFiAtlasClient() { - super(); - } - - public static NiFiAtlasClient getInstance() { - if (nifiClient == null) { - synchronized (NiFiAtlasClient.class) { - if (nifiClient == null) { - nifiClient = new NiFiAtlasClient(); - } - } - } - return nifiClient; - } - - public void initialize(final String[] baseUrls, final AtlasAuthN authN, final File atlasConfDir) { - - synchronized (NiFiAtlasClient.class) { - - if (atlasClient != null) { - logger.info("{} had been setup but replacing it with new one.", atlasClient); - ApplicationProperties.forceReload(); - } - - if (atlasConfDir != null) { - // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. - Properties props = System.getProperties(); - final String atlasConfProp = "atlas.conf"; - props.setProperty(atlasConfProp, atlasConfDir.getAbsolutePath()); - logger.debug("{} has been set to: {}", atlasConfProp, props.getProperty(atlasConfProp)); - } - - atlasClient = authN.createClient(baseUrls); - - } + public NiFiAtlasClient(AtlasClientV2 atlasClient) { + this.atlasClient = atlasClient; } /** diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java index 43fefff098..a15c93572d 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java @@ -60,7 +60,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { private static final String CONF_PREFIX = "atlas.hook.nifi."; private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - private final NiFiAtlasClient atlasClient; + private NiFiAtlasClient atlasClient; /** * An index to resolve a qualifiedName from a GUID. @@ -81,9 +81,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { }; } - public NiFiAtlasHook(NiFiAtlasClient atlasClient) { - this.atlasClient = atlasClient; - + public NiFiAtlasHook() { final int qualifiedNameCacheSize = 10_000; this.guidToQualifiedName = createCache(qualifiedNameCacheSize); @@ -91,6 +89,10 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize); } + public void setAtlasClient(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + } + @Override protected String getNumberOfRetriesPropertyKey() { return HOOK_NUM_RETRIES; diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 5bb60244f7..9238f95319 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -17,6 +17,7 @@ package org.apache.nifi.atlas.reporting; import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasServiceException; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; @@ -279,7 +280,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers"; private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG; private final ServiceLoader clusterResolverLoader = ServiceLoader.load(ClusterResolver.class); - private volatile NiFiAtlasClient atlasClient; + private volatile AtlasAuthN atlasAuthN; private volatile Properties atlasProperties; private volatile boolean isTypeDefCreated = false; private volatile String defaultClusterName; @@ -399,13 +400,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask { @OnScheduled public void setup(ConfigurationContext context) throws IOException { // initAtlasClient has to be done first as it loads AtlasProperty. - initAtlasClient(context); + initAtlasProperties(context); initLineageStrategy(context); initClusterResolvers(context); } private void initLineageStrategy(ConfigurationContext context) throws IOException { - nifiAtlasHook = new NiFiAtlasHook(atlasClient); + nifiAtlasHook = new NiFiAtlasHook(); final String strategy = context.getProperty(NIFI_LINEAGE_STRATEGY).getValue(); if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) { @@ -428,7 +429,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { } - private void initAtlasClient(ConfigurationContext context) throws IOException { + private void initAtlasProperties(ConfigurationContext context) throws IOException { List urls = new ArrayList<>(); parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add); final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https")); @@ -476,7 +477,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { throw new ProcessException("Default cluster name is not defined."); } - final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod); + atlasAuthN = getAtlasAuthN(atlasAuthNMethod); atlasAuthN.configure(context); // Create Atlas configuration file if necessary. @@ -497,16 +498,32 @@ public class ReportLineageToAtlas extends AbstractReportingTask { } } + getLogger().debug("Force reloading Atlas application properties."); + ApplicationProperties.forceReload(); - atlasClient = NiFiAtlasClient.getInstance(); + if (confDir != null) { + // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. + Properties props = System.getProperties(); + final String atlasConfProp = "atlas.conf"; + props.setProperty(atlasConfProp, confDir.getAbsolutePath()); + getLogger().debug("{} has been set to: {}", new Object[]{atlasConfProp, props.getProperty(atlasConfProp)}); + } + } + + /** + * In order to avoid authentication expiration issues (i.e. Kerberos ticket and DelegationToken expiration), + * create Atlas client instance at every onTrigger execution. + */ + private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) { + List urls = new ArrayList<>(); + parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add); try { - atlasClient.initialize(urls.toArray(new String[]{}), atlasAuthN, confDir); + return new NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{}))); } catch (final NullPointerException e) { throw new ProcessException(String.format("Failed to initialize Atlas client due to %s." + " Make sure 'atlas-application.properties' is in the directory specified with %s" + " or under root classpath if not specified.", e, ATLAS_CONF_DIR.getDisplayName()), e); } - } private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) { @@ -557,6 +574,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks. final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary(); + final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context); + // Create Entity defs in Atlas if there's none yet. if (!isTypeDefCreated) { try { @@ -578,7 +597,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // Regardless of whether being a primary task node, each node has to analyse NiFiFlow. // Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism. - final NiFiFlow nifiFlow = createNiFiFlow(context); + final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient); if (isResponsibleForPrimaryTasks) { @@ -592,11 +611,12 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // NOTE: There is a race condition between the primary node and other nodes. // If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node, // then the notification message will fail due to having a reference to a non-existing entity. + nifiAtlasHook.setAtlasClient(atlasClient); consumeNiFiProvenanceEvents(context, nifiFlow); } - private NiFiFlow createNiFiFlow(ReportingContext context) { + private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) { final ProcessGroupStatus rootProcessGroup = context.getEventAccess().getGroupStatus("root"); final String flowName = rootProcessGroup.getName(); final String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue(); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java index 88feba00c8..ab55b49fe4 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java @@ -73,7 +73,8 @@ public class Kerberos implements AtlasAuthN { UserGroupInformation.setConfiguration(hadoopConf); final UserGroupInformation ugi; try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + UserGroupInformation.loginUserFromKeytab(principal, keytab); + ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new RuntimeException("Failed to login with Kerberos due to: " + e, e); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java index f1727b0a45..69a3042cf4 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java @@ -39,14 +39,14 @@ public class ITNiFiAtlasClient { @Before public void setup() { - atlasClient = NiFiAtlasClient.getInstance(); // Add your atlas server ip address into /etc/hosts as atlas.example.com PropertyContext propertyContext = mock(PropertyContext.class); when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_USER)).thenReturn(new MockPropertyValue("admin")); when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_PASSWORD)).thenReturn(new MockPropertyValue("admin")); final AtlasAuthN atlasAuthN = new Basic(); atlasAuthN.configure(propertyContext); - atlasClient.initialize(new String[]{"http://atlas.example.com:21000/"}, atlasAuthN, null); + + atlasClient = new NiFiAtlasClient(atlasAuthN.createClient(new String[]{"http://atlas.example.com:21000/"})); } @Test diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java index 2fe7d07927..e83495adff 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java @@ -409,6 +409,7 @@ public class ITReportLineageToAtlas { when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus); final ProvenanceRepository provenanceRepository = mock(ProvenanceRepository.class); + when(eventAccess.getControllerStatus()).thenReturn(tc.rootPgStatus); when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); when(eventAccess.getProvenanceEvents(eq(-1L), anyInt())).thenReturn(tc.provenanceRecords); when(provenanceRepository.getMaxEventId()).thenReturn((long) tc.provenanceRecords.size() - 1);