From 4b8c80cccc3207dc9fe6895266af95cca7c72250 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 5 Jan 2018 19:23:28 +0900 Subject: [PATCH] NIFI-4741: Avoid DelegationToken expiration at ReportLineageToAtlas. This closes #2377 The reporting task used to hold a single AtlasClientV2 instance throughout its runtime starting from being started until being stopped. If it is configured to use Kerberos authentication for Atlas REST API, after a published DelegationToken expires (10 hours by default), the reporting task will not be able to recover from 401 Unauthorized state. In order to avoid stucking in such situation, this commit changes the way ReportLineageToAtlas uses AtlasClientV2 instance to create an instance per onTrigger execution. It also addresses Kerberos ticket expiration. This approach incurs some overheads by initiating the client each time, however, it should be insignificant from an overall processing time perspective including analyzing NiFi flow and Provenance records. --- .../apache/nifi/atlas/NiFiAtlasClient.java | 44 ++----------------- .../org/apache/nifi/atlas/NiFiAtlasHook.java | 10 +++-- .../atlas/reporting/ReportLineageToAtlas.java | 40 ++++++++++++----- .../apache/nifi/atlas/security/Kerberos.java | 3 +- .../apache/nifi/atlas/ITNiFiAtlasClient.java | 4 +- .../reporting/ITReportLineageToAtlas.java | 1 + 6 files changed, 44 insertions(+), 58 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java index feb2b48ccd..4e95a92a8f 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java @@ -18,7 +18,6 @@ package org.apache.nifi.atlas; import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.core.util.MultivaluedMapImpl; -import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasServiceException; @@ -29,14 +28,12 @@ import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.nifi.atlas.security.AtlasAuthN; import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.MultivaluedMap; -import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -44,7 +41,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -82,44 +78,10 @@ public class NiFiAtlasClient { private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); - private static NiFiAtlasClient nifiClient; - private AtlasClientV2 atlasClient; + private final AtlasClientV2 atlasClient; - private NiFiAtlasClient() { - super(); - } - - public static NiFiAtlasClient getInstance() { - if (nifiClient == null) { - synchronized (NiFiAtlasClient.class) { - if (nifiClient == null) { - nifiClient = new NiFiAtlasClient(); - } - } - } - return nifiClient; - } - - public void initialize(final String[] baseUrls, final AtlasAuthN authN, final File atlasConfDir) { - - synchronized (NiFiAtlasClient.class) { - - if (atlasClient != null) { - logger.info("{} had been setup but replacing it with new one.", atlasClient); - ApplicationProperties.forceReload(); - } - - if (atlasConfDir != null) { - // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. - Properties props = System.getProperties(); - final String atlasConfProp = "atlas.conf"; - props.setProperty(atlasConfProp, atlasConfDir.getAbsolutePath()); - logger.debug("{} has been set to: {}", atlasConfProp, props.getProperty(atlasConfProp)); - } - - atlasClient = authN.createClient(baseUrls); - - } + public NiFiAtlasClient(AtlasClientV2 atlasClient) { + this.atlasClient = atlasClient; } /** diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java index 43fefff098..a15c93572d 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java @@ -60,7 +60,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { private static final String CONF_PREFIX = "atlas.hook.nifi."; private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - private final NiFiAtlasClient atlasClient; + private NiFiAtlasClient atlasClient; /** * An index to resolve a qualifiedName from a GUID. @@ -81,9 +81,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { }; } - public NiFiAtlasHook(NiFiAtlasClient atlasClient) { - this.atlasClient = atlasClient; - + public NiFiAtlasHook() { final int qualifiedNameCacheSize = 10_000; this.guidToQualifiedName = createCache(qualifiedNameCacheSize); @@ -91,6 +89,10 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize); } + public void setAtlasClient(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + } + @Override protected String getNumberOfRetriesPropertyKey() { return HOOK_NUM_RETRIES; diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 5bb60244f7..9238f95319 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -17,6 +17,7 @@ package org.apache.nifi.atlas.reporting; import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasServiceException; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; @@ -279,7 +280,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers"; private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG; private final ServiceLoader clusterResolverLoader = ServiceLoader.load(ClusterResolver.class); - private volatile NiFiAtlasClient atlasClient; + private volatile AtlasAuthN atlasAuthN; private volatile Properties atlasProperties; private volatile boolean isTypeDefCreated = false; private volatile String defaultClusterName; @@ -399,13 +400,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask { @OnScheduled public void setup(ConfigurationContext context) throws IOException { // initAtlasClient has to be done first as it loads AtlasProperty. - initAtlasClient(context); + initAtlasProperties(context); initLineageStrategy(context); initClusterResolvers(context); } private void initLineageStrategy(ConfigurationContext context) throws IOException { - nifiAtlasHook = new NiFiAtlasHook(atlasClient); + nifiAtlasHook = new NiFiAtlasHook(); final String strategy = context.getProperty(NIFI_LINEAGE_STRATEGY).getValue(); if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) { @@ -428,7 +429,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { } - private void initAtlasClient(ConfigurationContext context) throws IOException { + private void initAtlasProperties(ConfigurationContext context) throws IOException { List urls = new ArrayList<>(); parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add); final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https")); @@ -476,7 +477,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { throw new ProcessException("Default cluster name is not defined."); } - final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod); + atlasAuthN = getAtlasAuthN(atlasAuthNMethod); atlasAuthN.configure(context); // Create Atlas configuration file if necessary. @@ -497,16 +498,32 @@ public class ReportLineageToAtlas extends AbstractReportingTask { } } + getLogger().debug("Force reloading Atlas application properties."); + ApplicationProperties.forceReload(); - atlasClient = NiFiAtlasClient.getInstance(); + if (confDir != null) { + // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. + Properties props = System.getProperties(); + final String atlasConfProp = "atlas.conf"; + props.setProperty(atlasConfProp, confDir.getAbsolutePath()); + getLogger().debug("{} has been set to: {}", new Object[]{atlasConfProp, props.getProperty(atlasConfProp)}); + } + } + + /** + * In order to avoid authentication expiration issues (i.e. Kerberos ticket and DelegationToken expiration), + * create Atlas client instance at every onTrigger execution. + */ + private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) { + List urls = new ArrayList<>(); + parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add); try { - atlasClient.initialize(urls.toArray(new String[]{}), atlasAuthN, confDir); + return new NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{}))); } catch (final NullPointerException e) { throw new ProcessException(String.format("Failed to initialize Atlas client due to %s." + " Make sure 'atlas-application.properties' is in the directory specified with %s" + " or under root classpath if not specified.", e, ATLAS_CONF_DIR.getDisplayName()), e); } - } private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) { @@ -557,6 +574,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks. final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary(); + final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context); + // Create Entity defs in Atlas if there's none yet. if (!isTypeDefCreated) { try { @@ -578,7 +597,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // Regardless of whether being a primary task node, each node has to analyse NiFiFlow. // Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism. - final NiFiFlow nifiFlow = createNiFiFlow(context); + final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient); if (isResponsibleForPrimaryTasks) { @@ -592,11 +611,12 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // NOTE: There is a race condition between the primary node and other nodes. // If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node, // then the notification message will fail due to having a reference to a non-existing entity. + nifiAtlasHook.setAtlasClient(atlasClient); consumeNiFiProvenanceEvents(context, nifiFlow); } - private NiFiFlow createNiFiFlow(ReportingContext context) { + private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) { final ProcessGroupStatus rootProcessGroup = context.getEventAccess().getGroupStatus("root"); final String flowName = rootProcessGroup.getName(); final String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue(); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java index 88feba00c8..ab55b49fe4 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java @@ -73,7 +73,8 @@ public class Kerberos implements AtlasAuthN { UserGroupInformation.setConfiguration(hadoopConf); final UserGroupInformation ugi; try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + UserGroupInformation.loginUserFromKeytab(principal, keytab); + ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new RuntimeException("Failed to login with Kerberos due to: " + e, e); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java index f1727b0a45..69a3042cf4 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java @@ -39,14 +39,14 @@ public class ITNiFiAtlasClient { @Before public void setup() { - atlasClient = NiFiAtlasClient.getInstance(); // Add your atlas server ip address into /etc/hosts as atlas.example.com PropertyContext propertyContext = mock(PropertyContext.class); when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_USER)).thenReturn(new MockPropertyValue("admin")); when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_PASSWORD)).thenReturn(new MockPropertyValue("admin")); final AtlasAuthN atlasAuthN = new Basic(); atlasAuthN.configure(propertyContext); - atlasClient.initialize(new String[]{"http://atlas.example.com:21000/"}, atlasAuthN, null); + + atlasClient = new NiFiAtlasClient(atlasAuthN.createClient(new String[]{"http://atlas.example.com:21000/"})); } @Test diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java index 2fe7d07927..e83495adff 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java @@ -409,6 +409,7 @@ public class ITReportLineageToAtlas { when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus); final ProvenanceRepository provenanceRepository = mock(ProvenanceRepository.class); + when(eventAccess.getControllerStatus()).thenReturn(tc.rootPgStatus); when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); when(eventAccess.getProvenanceEvents(eq(-1L), anyInt())).thenReturn(tc.provenanceRecords); when(provenanceRepository.getMaxEventId()).thenReturn((long) tc.provenanceRecords.size() - 1);