NIFI-4741: Avoid DelegationToken expiration at ReportLineageToAtlas. This closes #2377

The reporting task used to hold a single AtlasClientV2 instance
throughout its runtime starting from being started until being stopped.
If it is configured to use Kerberos authentication for Atlas REST API, after
a published DelegationToken expires (10 hours by default), the reporting
task will not be able to recover from 401 Unauthorized state.

In order to avoid stucking in such situation, this commit changes the
way ReportLineageToAtlas uses AtlasClientV2 instance to create an
instance per onTrigger execution. It also addresses Kerberos ticket
expiration.

This approach incurs some overheads by initiating the client each time,
however, it should be insignificant from an overall processing time
perspective including analyzing NiFi flow and Provenance records.
This commit is contained in:
Koji Kawamura 2018-01-05 19:23:28 +09:00 committed by Matt Gilman
parent e5ed62a98f
commit 4b8c80cccc
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
6 changed files with 44 additions and 58 deletions

View File

@ -18,7 +18,6 @@ package org.apache.nifi.atlas;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasServiceException;
@ -29,14 +28,12 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.nifi.atlas.security.AtlasAuthN;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.MultivaluedMap;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -44,7 +41,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@ -82,44 +78,10 @@ public class NiFiAtlasClient {
private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class);
private static NiFiAtlasClient nifiClient;
private AtlasClientV2 atlasClient;
private final AtlasClientV2 atlasClient;
private NiFiAtlasClient() {
super();
}
public static NiFiAtlasClient getInstance() {
if (nifiClient == null) {
synchronized (NiFiAtlasClient.class) {
if (nifiClient == null) {
nifiClient = new NiFiAtlasClient();
}
}
}
return nifiClient;
}
public void initialize(final String[] baseUrls, final AtlasAuthN authN, final File atlasConfDir) {
synchronized (NiFiAtlasClient.class) {
if (atlasClient != null) {
logger.info("{} had been setup but replacing it with new one.", atlasClient);
ApplicationProperties.forceReload();
}
if (atlasConfDir != null) {
// If atlasConfDir is not set, atlas-application.properties will be searched under classpath.
Properties props = System.getProperties();
final String atlasConfProp = "atlas.conf";
props.setProperty(atlasConfProp, atlasConfDir.getAbsolutePath());
logger.debug("{} has been set to: {}", atlasConfProp, props.getProperty(atlasConfProp));
}
atlasClient = authN.createClient(baseUrls);
}
public NiFiAtlasClient(AtlasClientV2 atlasClient) {
this.atlasClient = atlasClient;
}
/**

View File

@ -60,7 +60,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
private static final String CONF_PREFIX = "atlas.hook.nifi.";
private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
private final NiFiAtlasClient atlasClient;
private NiFiAtlasClient atlasClient;
/**
* An index to resolve a qualifiedName from a GUID.
@ -81,9 +81,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
};
}
public NiFiAtlasHook(NiFiAtlasClient atlasClient) {
this.atlasClient = atlasClient;
public NiFiAtlasHook() {
final int qualifiedNameCacheSize = 10_000;
this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
@ -91,6 +89,10 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
}
public void setAtlasClient(NiFiAtlasClient atlasClient) {
this.atlasClient = atlasClient;
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;

View File

@ -17,6 +17,7 @@
package org.apache.nifi.atlas.reporting;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasServiceException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -279,7 +280,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers";
private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;
private final ServiceLoader<ClusterResolver> clusterResolverLoader = ServiceLoader.load(ClusterResolver.class);
private volatile NiFiAtlasClient atlasClient;
private volatile AtlasAuthN atlasAuthN;
private volatile Properties atlasProperties;
private volatile boolean isTypeDefCreated = false;
private volatile String defaultClusterName;
@ -399,13 +400,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
@OnScheduled
public void setup(ConfigurationContext context) throws IOException {
// initAtlasClient has to be done first as it loads AtlasProperty.
initAtlasClient(context);
initAtlasProperties(context);
initLineageStrategy(context);
initClusterResolvers(context);
}
private void initLineageStrategy(ConfigurationContext context) throws IOException {
nifiAtlasHook = new NiFiAtlasHook(atlasClient);
nifiAtlasHook = new NiFiAtlasHook();
final String strategy = context.getProperty(NIFI_LINEAGE_STRATEGY).getValue();
if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) {
@ -428,7 +429,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
}
private void initAtlasClient(ConfigurationContext context) throws IOException {
private void initAtlasProperties(ConfigurationContext context) throws IOException {
List<String> urls = new ArrayList<>();
parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
@ -476,7 +477,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
throw new ProcessException("Default cluster name is not defined.");
}
final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
atlasAuthN.configure(context);
// Create Atlas configuration file if necessary.
@ -497,16 +498,32 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
}
}
getLogger().debug("Force reloading Atlas application properties.");
ApplicationProperties.forceReload();
atlasClient = NiFiAtlasClient.getInstance();
if (confDir != null) {
// If atlasConfDir is not set, atlas-application.properties will be searched under classpath.
Properties props = System.getProperties();
final String atlasConfProp = "atlas.conf";
props.setProperty(atlasConfProp, confDir.getAbsolutePath());
getLogger().debug("{} has been set to: {}", new Object[]{atlasConfProp, props.getProperty(atlasConfProp)});
}
}
/**
* In order to avoid authentication expiration issues (i.e. Kerberos ticket and DelegationToken expiration),
* create Atlas client instance at every onTrigger execution.
*/
private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
List<String> urls = new ArrayList<>();
parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
try {
atlasClient.initialize(urls.toArray(new String[]{}), atlasAuthN, confDir);
return new NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{})));
} catch (final NullPointerException e) {
throw new ProcessException(String.format("Failed to initialize Atlas client due to %s." +
" Make sure 'atlas-application.properties' is in the directory specified with %s" +
" or under root classpath if not specified.", e, ATLAS_CONF_DIR.getDisplayName()), e);
}
}
private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) {
@ -557,6 +574,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// If standalone or being primary node in a NiFi cluster, this node is responsible for doing primary tasks.
final boolean isResponsibleForPrimaryTasks = !isClustered || getNodeTypeProvider().isPrimary();
final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context);
// Create Entity defs in Atlas if there's none yet.
if (!isTypeDefCreated) {
try {
@ -578,7 +597,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// Regardless of whether being a primary task node, each node has to analyse NiFiFlow.
// Assuming each node has the same flow definition, that is guaranteed by NiFi cluster management mechanism.
final NiFiFlow nifiFlow = createNiFiFlow(context);
final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
if (isResponsibleForPrimaryTasks) {
@ -592,11 +611,12 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// NOTE: There is a race condition between the primary node and other nodes.
// If a node notifies an event related to a NiFi component which is not yet created by NiFi primary node,
// then the notification message will fail due to having a reference to a non-existing entity.
nifiAtlasHook.setAtlasClient(atlasClient);
consumeNiFiProvenanceEvents(context, nifiFlow);
}
private NiFiFlow createNiFiFlow(ReportingContext context) {
private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) {
final ProcessGroupStatus rootProcessGroup = context.getEventAccess().getGroupStatus("root");
final String flowName = rootProcessGroup.getName();
final String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();

View File

@ -73,7 +73,8 @@ public class Kerberos implements AtlasAuthN {
UserGroupInformation.setConfiguration(hadoopConf);
final UserGroupInformation ugi;
try {
ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new RuntimeException("Failed to login with Kerberos due to: " + e, e);
}

View File

@ -39,14 +39,14 @@ public class ITNiFiAtlasClient {
@Before
public void setup() {
atlasClient = NiFiAtlasClient.getInstance();
// Add your atlas server ip address into /etc/hosts as atlas.example.com
PropertyContext propertyContext = mock(PropertyContext.class);
when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_USER)).thenReturn(new MockPropertyValue("admin"));
when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_PASSWORD)).thenReturn(new MockPropertyValue("admin"));
final AtlasAuthN atlasAuthN = new Basic();
atlasAuthN.configure(propertyContext);
atlasClient.initialize(new String[]{"http://atlas.example.com:21000/"}, atlasAuthN, null);
atlasClient = new NiFiAtlasClient(atlasAuthN.createClient(new String[]{"http://atlas.example.com:21000/"}));
}
@Test

View File

@ -409,6 +409,7 @@ public class ITReportLineageToAtlas {
when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus);
final ProvenanceRepository provenanceRepository = mock(ProvenanceRepository.class);
when(eventAccess.getControllerStatus()).thenReturn(tc.rootPgStatus);
when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
when(eventAccess.getProvenanceEvents(eq(-1L), anyInt())).thenReturn(tc.provenanceRecords);
when(provenanceRepository.getMaxEventId()).thenReturn((long) tc.provenanceRecords.size() - 1);