NIFI-6939: Upgrade Atlas client dependency to 2.0.0

NIFI-6939: Review changes

This closes #3944
This commit is contained in:
Peter Turcsanyi 2019-12-19 16:53:42 +01:00 committed by Matt Gilman
parent c604923c0b
commit cc74534bc0
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
41 changed files with 422 additions and 428 deletions

View File

@ -48,49 +48,24 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
</dependency>
<!-- Explicitly force beanutils 1.9.3 because versions prior to 1.9.2 had a vuln
Can remove this once atlas client which depends on hadoop-common uses a more recent version -->
<!-- Explicitly force beanutils 1.9.4 in order to avoid vulnerabilities in earlier versions.
Can remove this once atlas client which depends on hadoop-common uses a more recent version. -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
<!-- Exclude dependencies to reduce NAR file size -->
<exclusions>
<!-- NOTE: Scala is required by atlas notification -->
<!--
fastutil-6.5.16.jar is 16MB.
'fastutil' is only used by
org.apache.atlas.repository.memory.AttributeStores
which is deprecated as being part of V1 API.
-->
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</exclusion>
<!-- Explicit dep referred to in POM above. commons-beanutils and commons-beanutils-core merged in 1.9.0 -->
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
</exclusions>
<artifactId>atlas-client-v2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
<!--
NOTE: Could not use nifi-hadoop-libraries-nar because hadoop-client uses httpclient-4.2.5,
but atlas-client uses httpclient-4.5.3.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
-->
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>

View File

@ -17,7 +17,7 @@
package org.apache.nifi.atlas.hook;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.provenance.lineage.LineageContext;
@ -26,38 +26,29 @@ import java.util.List;
/**
* This class is not thread-safe as it holds uncommitted notification messages within instance.
* {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread.
* {@link #addMessage(HookNotification)} and {@link #commitMessages()} should be used serially from a single thread.
*/
public class NiFiAtlasHook extends AtlasHook implements LineageContext {
public static final String NIFI_USER = "nifi";
private static final String CONF_PREFIX = "atlas.hook.nifi.";
private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
private NiFiAtlasClient atlasClient;
public void setAtlasClient(NiFiAtlasClient atlasClient) {
this.atlasClient = atlasClient;
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
private final List<HookNotificationMessage> messages = new ArrayList<>();
private final List<HookNotification> messages = new ArrayList<>();
@Override
public void addMessage(HookNotificationMessage message) {
public void addMessage(HookNotification message) {
messages.add(message);
}
public void commitMessages() {
final NotificationSender notificationSender = createNotificationSender();
notificationSender.setAtlasClient(atlasClient);
List<HookNotificationMessage> messagesBatch = new ArrayList<>(messages);
List<HookNotification> messagesBatch = new ArrayList<>(messages);
messages.clear();
notificationSender.send(messagesBatch, this::notifyEntities);
}
@ -72,7 +63,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
return new NotificationSender();
}
List<HookNotificationMessage> getMessages() {
List<HookNotification> getMessages() {
return messages;
}
}

View File

@ -20,9 +20,11 @@ import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.util.Tuple;
@ -39,23 +41,24 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_CREATE;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
import static org.apache.atlas.model.notification.HookNotification.HookNotificationType.ENTITY_CREATE;
import static org.apache.atlas.model.notification.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
/**
* This class implements Atlas hook notification message deduplication mechanism.
@ -175,7 +178,7 @@ class NotificationSender {
* <p>Send hook notification messages.
* In order to notify relationships between 'nifi_flow_path' and its inputs/outputs, this method sends messages in following order:</p>
* <ol>
* <li>As a a single {@link org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest} message:
* <li>As a a single {@link org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest} message:
* <ul>
* <li>New entities except 'nifi_flow_path', including DataSets such as 'nifi_queue', 'kafka_topic' or 'hive_table' ... etc,
* so that 'nifi_flow_path' can refer</li>
@ -190,21 +193,21 @@ class NotificationSender {
* @param messages list of messages to be sent
* @param notifier responsible for sending notification messages, its accept method can be called multiple times
*/
void send(final List<HookNotification.HookNotificationMessage> messages, final Consumer<List<HookNotification.HookNotificationMessage>> notifier) {
void send(final List<HookNotification> messages, final BiConsumer<List<HookNotification>, UserGroupInformation> notifier) {
logger.info("Sending {} messages to Atlas", messages.size());
final Metrics metrics = new Metrics();
try {
metrics.totalMessages = messages.size();
final Map<Boolean, List<HookNotification.HookNotificationMessage>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType())));
final Map<Boolean, List<HookNotification>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType())));
final List<HookNotification.HookNotificationMessage> creates = safeGet(createAndOthers, true);
final List<HookNotification> creates = safeGet(createAndOthers, true);
metrics.createMessages = creates.size();
final Map<Boolean, List<Referenceable>> newFlowPathsAndOtherEntities = creates.stream()
.flatMap(msg -> ((HookNotification.EntityCreateRequest) msg).getEntities().stream())
.collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.typeName)));
.flatMap(msg -> ((HookNotificationV1.EntityCreateRequest) msg).getEntities().stream())
.collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.getTypeName())));
// Deduplicate same entity creation messages.
final List<Referenceable> newEntitiesExceptFlowPaths = safeGet(newFlowPathsAndOtherEntities, false)
@ -227,28 +230,28 @@ class NotificationSender {
newEntities.addAll(newEntitiesExceptFlowPaths);
newEntities.addAll(newFlowPaths);
if (!newEntities.isEmpty()) {
notifier.accept(Collections.singletonList(new HookNotification.EntityCreateRequest(NIFI_USER, newEntities)));
notifier.accept(Collections.singletonList(new HookNotificationV1.EntityCreateRequest(NIFI_USER, newEntities)), null);
}
final Map<Boolean, List<HookNotification.HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers
final Map<Boolean, List<HookNotification>> partialNiFiFlowPathUpdateAndOthers
= safeGet(createAndOthers, false).stream().collect(groupingBy(msg
-> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
&& TYPE_NIFI_FLOW_PATH.equals(((HookNotification.EntityPartialUpdateRequest)msg).getTypeName())
&& ATTR_QUALIFIED_NAME.equals(((HookNotification.EntityPartialUpdateRequest)msg).getAttribute())
&& TYPE_NIFI_FLOW_PATH.equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getTypeName())
&& ATTR_QUALIFIED_NAME.equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getAttribute())
));
// These updates are made against existing flow path entities.
final List<HookNotification.HookNotificationMessage> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
final List<HookNotification.HookNotificationMessage> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false);
final List<HookNotification> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
final List<HookNotification> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false);
metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
metrics.otherMessages = otherMessages.size();
// 2. Notify de-duplicated 'nifi_flow_path' updates
final List<HookNotification.HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotification.EntityPartialUpdateRequest) msg)
final List<HookNotification> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotificationV1.EntityPartialUpdateRequest) msg)
// Group by nifi_flow_path qualifiedName value.
.collect(groupingBy(HookNotification.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
.collect(groupingBy(HookNotificationV1.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
.map(entry -> {
final String flowPathQualifiedName = entry.getKey();
final Map<String, Referenceable> distinctInputs;
@ -274,7 +277,7 @@ class NotificationSender {
}
// Merge all inputs and outputs for this nifi_flow_path.
for (HookNotification.EntityPartialUpdateRequest msg : entry.getValue()) {
for (HookNotificationV1.EntityPartialUpdateRequest msg : entry.getValue()) {
fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics)
.entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey()))
.forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue()));
@ -290,17 +293,17 @@ class NotificationSender {
// org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable
flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values()));
flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values()));
return new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
return new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
metrics.uniquePartialNiFiFlowPathUpdates = deduplicatedMessages.size();
notifier.accept(deduplicatedMessages);
notifier.accept(deduplicatedMessages, null);
// 3. Notify other messages
notifier.accept(otherMessages);
notifier.accept(otherMessages, null);
} finally {
logger.info(metrics.toLogString("Finished"));
@ -380,7 +383,7 @@ class NotificationSender {
final String typedRefQualifiedName = toTypedQualifiedName(typeName, refQualifiedName);
final Referenceable refFromCacheIfAvailable = typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
if (id.isAssigned()) {
if (isAssigned(id)) {
// If this referenceable has Guid assigned, then add this one to cache.
guidToTypedQualifiedName.put(id._getId(), typedRefQualifiedName);
}
@ -391,4 +394,15 @@ class NotificationSender {
}).filter(tuple -> tuple.getValue() != null)
.collect(toMap(Tuple::getKey, Tuple::getValue));
}
// Copy of org.apache.atlas.typesystem.persistence.Id.isAssigned() from v0.8.1. This method does not exists in v2.0.0.
private boolean isAssigned(Id id) {
try {
UUID.fromString(id.getId());
} catch (IllegalArgumentException e) {
return false;
}
return true;
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.provenance.ProvenanceEventType;
import java.net.MalformedURLException;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import java.util.Collections;
import java.util.LinkedHashSet;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.util.Tuple;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.controller.status.ConnectionStatus;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.controller.status.ConnectionStatus;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer.unknown;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer.unknown;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer.unknown;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;

View File

@ -17,8 +17,9 @@
package org.apache.nifi.atlas.provenance.lineage;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.provenance.AnalysisContext;
@ -40,7 +41,6 @@ import java.util.stream.Collectors;
import static org.apache.nifi.atlas.AtlasUtils.toStr;
import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW;
@ -49,6 +49,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
public abstract class AbstractLineageStrategy implements LineageStrategy {
@ -131,7 +132,7 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
}
protected void createEntity(Referenceable ... entities) {
final HookNotification.EntityCreateRequest msg = new HookNotification.EntityCreateRequest(NIFI_USER, entities);
final HookNotificationV1.EntityCreateRequest msg = new HookNotificationV1.EntityCreateRequest(NIFI_USER, entities);
lineageContext.addMessage(msg);
}
@ -146,11 +147,11 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
refsToAdd.stream().filter(ref -> !existingRefTypedQualifiedNames.contains(toTypedQualifiedName.apply(ref)))
.forEach(ref -> {
if (ref.getId().isUnassigned()) {
if (isUnassigned(ref.getId())) {
// Create new entity.
logger.debug("Found a new DataSet reference from {} to {}, sending an EntityCreateRequest",
new Object[]{toTypedQualifiedName.apply(nifiFlowPath), toTypedQualifiedName.apply(ref)});
final HookNotification.EntityCreateRequest createDataSet = new HookNotification.EntityCreateRequest(NIFI_USER, ref);
final HookNotificationV1.EntityCreateRequest createDataSet = new HookNotificationV1.EntityCreateRequest(NIFI_USER, ref);
lineageContext.addMessage(createDataSet);
}
refs.add(ref);
@ -169,10 +170,18 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
final boolean inputsAdded = addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS);
final boolean outputsAdded = addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS);
if (inputsAdded || outputsAdded) {
lineageContext.addMessage(new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
lineageContext.addMessage(new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
ATTR_QUALIFIED_NAME, (String) flowPathRef.get(ATTR_QUALIFIED_NAME), flowPathRef));
}
}
// Copy of org.apache.atlas.typesystem.persistence.Id.isUnassigned() from v0.8.1. This method does not exists in v2.0.0.
private boolean isUnassigned(Id id) {
try {
long l = Long.parseLong(id.getId());
return l < 0;
} catch (NumberFormatException ne) {
return false;
}
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.lineage;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.provenance.AnalysisContext;

View File

@ -16,8 +16,8 @@
*/
package org.apache.nifi.atlas.provenance.lineage;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.model.notification.HookNotification;
public interface LineageContext {
void addMessage(HookNotification.HookNotificationMessage message);
void addMessage(HookNotification message);
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.lineage;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.provenance.AnalysisContext;

View File

@ -16,9 +16,56 @@
*/
package org.apache.nifi.atlas.reporting;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.RegexClusterResolver;
import org.apache.nifi.atlas.security.AtlasAuthN;
import org.apache.nifi.atlas.security.Basic;
import org.apache.nifi.atlas.security.Kerberos;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.ssl.SSLContextService;
import java.io.File;
import java.io.FileInputStream;
@ -44,56 +91,9 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasServiceException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
import org.apache.nifi.atlas.resolver.ClusterResolver;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.atlas.resolver.RegexClusterResolver;
import org.apache.nifi.atlas.security.AtlasAuthN;
import org.apache.nifi.atlas.security.Basic;
import org.apache.nifi.atlas.security.Kerberos;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.ssl.SSLContextService;
import com.sun.jersey.api.client.ClientResponse;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
@Tags({"atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
@ -552,6 +552,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// Create Atlas configuration file if necessary.
if (createAtlasConf) {
// enforce synchronous notification sending (needed for the checkpointing in ProvenanceEventConsumer)
atlasProperties.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS, atlasConnectTimeoutMs);
atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, atlasReadTimeoutMs);
@ -568,6 +570,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.format(Instant.now());
atlasProperties.store(fos, "Generated by Apache NiFi ReportLineageToAtlas ReportingTask at " + ts);
}
} else {
// check if synchronous notification sending has been set (needed for the checkpointing in ProvenanceEventConsumer)
String isAsync = atlasProperties.getProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS);
if (isAsync == null || !isAsync.equalsIgnoreCase("false")) {
throw new ProcessException("Atlas property '" + AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS + "' must be set to 'false' in " + ATLAS_PROPERTIES_FILENAME + "." +
" Sending notifications asynchronously is not supported by the reporting task.");
}
}
getLogger().debug("Force reloading Atlas application properties.");

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.atlas.security;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.atlas.AtlasClientV2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@ -34,6 +25,15 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
public class Kerberos implements AtlasAuthN {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";

View File

@ -22,10 +22,11 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiTypes;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
@ -94,19 +95,19 @@ public class AtlasAPIV2ServerEmulator {
server.start();
logger.info("Starting {} on port {}", AtlasAPIV2ServerEmulator.class.getSimpleName(), httpConnector.getLocalPort());
embeddedKafka = new EmbeddedKafka(false);
embeddedKafka = new EmbeddedKafka();
embeddedKafka.start();
notificationServerEmulator.consume(m -> {
if (m instanceof HookNotification.EntityCreateRequest) {
HookNotification.EntityCreateRequest em = (HookNotification.EntityCreateRequest) m;
if (m instanceof HookNotificationV1.EntityCreateRequest) {
HookNotificationV1.EntityCreateRequest em = (HookNotificationV1.EntityCreateRequest) m;
for (Referenceable ref : em.getEntities()) {
final AtlasEntity entity = toEntity(ref);
createEntityByNotification(entity);
}
} else if (m instanceof HookNotification.EntityPartialUpdateRequest) {
HookNotification.EntityPartialUpdateRequest em
= (HookNotification.EntityPartialUpdateRequest) m;
} else if (m instanceof HookNotificationV1.EntityPartialUpdateRequest) {
HookNotificationV1.EntityPartialUpdateRequest em
= (HookNotificationV1.EntityPartialUpdateRequest) m;
final AtlasEntity entity = toEntity(em.getEntity());
entity.setAttribute(em.getAttribute(), em.getAttributeValue());
updateEntityByNotification(entity);
@ -241,7 +242,11 @@ public class AtlasAPIV2ServerEmulator {
}
private static <T> T readInputJSON(HttpServletRequest req, Class<? extends T> clazz) throws IOException {
return new ObjectMapper().reader().withType(clazz).readValue(req.getInputStream());
return new ObjectMapper()
.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.reader()
.withType(clazz)
.readValue(req.getInputStream());
}
private static final AtlasTypesDef atlasTypesDef = new AtlasTypesDef();

View File

@ -16,9 +16,9 @@
*/
package org.apache.nifi.atlas.emulator;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -36,7 +36,7 @@ public class AtlasNotificationServerEmulator {
private volatile boolean isStopped;
public void consume(Consumer<HookNotification.HookNotificationMessage> c) {
public void consume(Consumer<HookNotification> c) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
@ -53,8 +53,8 @@ public class AtlasNotificationServerEmulator {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
final MessageDeserializer deserializer = NotificationInterface.NotificationType.HOOK.getDeserializer();
final HookNotification.HookNotificationMessage m
= (HookNotification.HookNotificationMessage) deserializer.deserialize(record.value());
final HookNotification m
= (HookNotification) deserializer.deserialize(record.value());
c.accept(m);
}
}

View File

@ -28,8 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Properties;
/**
@ -47,10 +45,6 @@ public class EmbeddedKafka {
private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
private final int kafkaPort;
private final int zookeeperPort;
private boolean started;
/**
@ -58,8 +52,8 @@ public class EmbeddedKafka {
* configuration properties will be loaded from 'server.properties' and
* 'zookeeper.properties' located at the root of the classpath.
*/
public EmbeddedKafka(boolean useRandomPort) {
this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"), useRandomPort);
public EmbeddedKafka() {
this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
}
/**
@ -70,47 +64,14 @@ public class EmbeddedKafka {
* @param zookeeperConfig
* Zookeeper configuration properties
*/
public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig, boolean useRandomPort) {
public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
this.cleanupKafkaWorkDir();
this.zookeeperConfig = zookeeperConfig;
this.kafkaConfig = kafkaConfig;
this.zookeeperConfig = zookeeperConfig;
if (useRandomPort) {
this.kafkaPort = this.availablePort();
this.zookeeperPort = this.availablePort();
this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
} else {
this.kafkaPort = Integer.parseInt(kafkaConfig.getProperty("port"));
this.zookeeperPort = Integer.parseInt(zookeeperConfig.getProperty("clientPort"));
}
this.zkServer = new ZooKeeperServer();
this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
}
/**
*
* @return port for Kafka server
*/
public int getKafkaPort() {
if (!this.started) {
throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
}
return this.kafkaPort;
}
/**
*
* @return port for Zookeeper server
*/
public int getZookeeperPort() {
if (!this.started) {
throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
}
return this.zookeeperPort;
this.zkServer = new ZooKeeperServer();
}
/**
@ -127,7 +88,7 @@ public class EmbeddedKafka {
logger.info("Starting Kafka server");
this.kafkaServer.startup();
logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.staticServerConfig().port()
+ ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
this.started = true;
}
@ -209,24 +170,4 @@ public class EmbeddedKafka {
throw new IllegalStateException(e);
}
}
/**
* Will determine the available port used by Kafka/Zookeeper servers.
*/
private int availablePort() {
ServerSocket s = null;
try {
s = new ServerSocket(0);
s.setReuseAddress(true);
return s.getLocalPort();
} catch (Exception e) {
throw new IllegalStateException("Failed to discover available port.", e);
} finally {
try {
s.close();
} catch (IOException e) {
// ignore
}
}
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.hook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.model.notification.HookNotification;
import org.junit.Before;
import org.junit.Test;
@ -40,16 +40,16 @@ public class TestNiFiAtlasHook {
@Test
public void messagesListShouldContainMessagesAfterAddMessage() {
hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
assertEquals(2, hook.getMessages().size());
}
@Test
public void messagesListShouldBeCleanedUpAfterCommit() {
hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
hook.commitMessages();

View File

@ -19,8 +19,10 @@ package org.apache.nifi.atlas.hook;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.junit.Test;
@ -34,7 +36,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
@ -60,10 +62,10 @@ public class TestNotificationSender {
private static final Logger logger = LoggerFactory.getLogger(TestNotificationSender.class);
private static class Notifier implements Consumer<List<HookNotification.HookNotificationMessage>> {
private final List<List<HookNotification.HookNotificationMessage>> notifications = new ArrayList<>();
private static class Notifier implements BiConsumer<List<HookNotification>, UserGroupInformation> {
private final List<List<HookNotification>> notifications = new ArrayList<>();
@Override
public void accept(List<HookNotification.HookNotificationMessage> messages) {
public void accept(List<HookNotification> messages, UserGroupInformation ugi) {
logger.info("notified at {}, {}", notifications.size(), messages);
notifications.add(messages);
}
@ -72,7 +74,7 @@ public class TestNotificationSender {
@Test
public void testZeroMessage() {
final NotificationSender sender = new NotificationSender();
final List<HookNotification.HookNotificationMessage> messages = Collections.emptyList();
final List<HookNotification> messages = Collections.emptyList();
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
assertEquals(0, notifier.notifications.get(0).size());
@ -88,9 +90,9 @@ public class TestNotificationSender {
@SuppressWarnings("unchecked")
private void assertCreateMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
assertTrue(notifier.notifications.size() > notificationIndex);
final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
final List<HookNotification> messages = notifier.notifications.get(notificationIndex);
assertEquals(1, messages.size());
final HookNotification.EntityCreateRequest message = (HookNotification.EntityCreateRequest) messages.get(0);
final HookNotificationV1.EntityCreateRequest message = (HookNotificationV1.EntityCreateRequest) messages.get(0);
assertEquals(expects.length, message.getEntities().size());
// The use of 'flatMap' at NotificationSender does not preserve actual entities order.
@ -135,11 +137,11 @@ public class TestNotificationSender {
@SuppressWarnings("unchecked")
private void assertUpdateFlowPathMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
assertTrue(notifier.notifications.size() > notificationIndex);
final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
final List<HookNotification> messages = notifier.notifications.get(notificationIndex);
assertEquals(expects.length, messages.size());
for (int i = 0; i < expects.length; i++) {
final Referenceable expect = expects[i];
final HookNotification.EntityPartialUpdateRequest actual = (HookNotification.EntityPartialUpdateRequest) messages.get(i);
final HookNotificationV1.EntityPartialUpdateRequest actual = (HookNotificationV1.EntityPartialUpdateRequest) messages.get(i);
assertEquals(expect.getTypeName(), actual.getTypeName());
assertEquals(ATTR_QUALIFIED_NAME, actual.getAttribute());
assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.getAttributeValue());
@ -155,8 +157,8 @@ public class TestNotificationSender {
public void testOneCreateDataSetMessage() {
final NotificationSender sender = new NotificationSender();
final Referenceable queue1 = createRef(TYPE_NIFI_QUEUE, "queue1@test");
final List<HookNotification.HookNotificationMessage> messages = Collections.singletonList(
new HookNotification.EntityCreateRequest(NIFI_USER, queue1));
final List<HookNotification> messages = Collections.singletonList(
new HookNotificationV1.EntityCreateRequest(NIFI_USER, queue1));
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
@ -275,46 +277,46 @@ public class TestNotificationSender {
ff4_pathB22.set(ATTR_OUTPUTS, singleton(ff4_data22));
final List<HookNotification.HookNotificationMessage> messages = asList(
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data1),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB12),
final List<HookNotification> messages = asList(
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data1),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data12),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathA11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathA12),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_fork11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_fork12),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathB11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathB12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data2),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB22),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data2),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data22),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathA21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathA22),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_fork21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_fork22),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathB21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathB22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data1),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB11),
new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB12),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data1),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data12),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathA11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathA12),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_fork11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_fork12),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathB11),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathB12),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data2),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork22),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB21),
new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB22)
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data2),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data22),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathA21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathA22),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_fork21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_fork22),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathB21),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathB22)
);
final Notifier notifier = new Notifier();
@ -376,10 +378,10 @@ public class TestNotificationSender {
newPath1Lineage.set(ATTR_INPUTS, singleton(fileC));
newPath1Lineage.set(ATTR_OUTPUTS, singleton(fileD));
final List<HookNotification.HookNotificationMessage> messages = asList(
new HookNotification.EntityCreateRequest(NIFI_USER, fileC),
new HookNotification.EntityCreateRequest(NIFI_USER, fileD),
new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
final List<HookNotification> messages = asList(
new HookNotificationV1.EntityCreateRequest(NIFI_USER, fileC),
new HookNotificationV1.EntityCreateRequest(NIFI_USER, fileD),
new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
);
final NiFiAtlasClient atlasClient = mock(NiFiAtlasClient.class);

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;

View File

@ -26,6 +26,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
@ -123,7 +124,38 @@ public class ITReportLineageToAtlas {
throw new RuntimeException("Failed to parse template", e);
}
return handler.getRootProcessGroupStatus();
ProcessGroupStatus rootPgStatus = handler.getRootProcessGroupStatus();
for (ConnectionStatus connectionStatus : rootPgStatus.getConnectionStatus()) {
connectionStatus.setSourceName(lookupComponentName(rootPgStatus, connectionStatus.getSourceId()));
connectionStatus.setDestinationName(lookupComponentName(rootPgStatus, connectionStatus.getDestinationId()));
}
return rootPgStatus;
}
private String lookupComponentName(ProcessGroupStatus rootPgStatus, String componentId) {
for (ProcessorStatus processorStatus : rootPgStatus.getProcessorStatus()) {
if (processorStatus.getId().equals(componentId)) {
return processorStatus.getName();
}
}
for (PortStatus portStatus : rootPgStatus.getInputPortStatus()) {
if (portStatus.getId().equals(componentId)) {
return portStatus.getName();
}
}
for (PortStatus portStatus : rootPgStatus.getOutputPortStatus()) {
if (portStatus.getId().equals(componentId)) {
return portStatus.getName();
}
}
for (RemoteProcessGroupStatus remoteProcessGroupStatus : rootPgStatus.getRemoteProcessGroupStatus()) {
if (remoteProcessGroupStatus.getId().equals(componentId)) {
return remoteProcessGroupStatus.getName();
}
}
return null; // funnels do not have names
}
private static class TemplateContentHander implements ContentHandler {

View File

@ -17,11 +17,15 @@
package org.apache.nifi.atlas.reporting;
import com.sun.jersey.api.client.Client;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.hook.AtlasHook;
import org.apache.commons.configuration.Configuration;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.util.MockComponentLog;
@ -35,11 +39,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -54,6 +61,7 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_BOOTSTRAP_SERVERS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@ -128,7 +136,9 @@ public class TestReportLineageToAtlas {
@Test
public void testDefaultConnectAndReadTimeout() throws Exception {
// GIVEN
Map<PropertyDescriptor, String> properties = new HashMap<>();
String atlasConfDir = createAtlasConfDir();
Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
// WHEN
// THEN
@ -141,21 +151,12 @@ public class TestReportLineageToAtlas {
int expectedConnectTimeoutMs = 10000;
int expectedReadTimeoutMs = 5000;
String atlasConfDir = "target/atlasConfDir";
File directory = new File(atlasConfDir);
if (!directory.exists()) {
directory.mkdirs();
}
String atlasConfDir = createAtlasConfDir();
Map<PropertyDescriptor, String> properties = new HashMap<>();
Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
properties.put(ATLAS_CONNECT_TIMEOUT, (expectedConnectTimeoutMs / 1000) + " sec");
properties.put(ATLAS_READ_TIMEOUT, (expectedReadTimeoutMs / 1000) + " sec");
properties.put(ATLAS_CONF_DIR, atlasConfDir);
properties.put(ATLAS_CONF_CREATE, "true");
properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:6667");
// WHEN
// THEN
testConnectAndReadTimeout(properties, expectedConnectTimeoutMs, expectedReadTimeoutMs);
@ -163,11 +164,6 @@ public class TestReportLineageToAtlas {
private void testConnectAndReadTimeout(Map<PropertyDescriptor, String> properties, Integer expectedConnectTimeout, Integer expectedReadTimeout) throws Exception {
// GIVEN
properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
properties.put(ATLAS_URLS, "http://localhost:27000");
properties.put(ATLAS_USER, "admin");
properties.put(ATLAS_PASSWORD, "admin123");
reportingContext = mock(ReportingContext.class);
when(reportingContext.getProperties()).thenReturn(properties);
when(reportingContext.getProperty(any())).then(invocation -> new MockPropertyValue(properties.get(invocation.getArguments()[0])));
@ -200,4 +196,83 @@ public class TestReportLineageToAtlas {
assertEquals(expectedConnectTimeout, actualConnectTimeout);
assertEquals(expectedReadTimeout, actualReadTimeout);
}
@Test
public void testNotificationSendingIsSynchronousWhenAtlasConfIsGenerated() throws Exception {
String atlasConfDir = createAtlasConfDir();
Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
testNotificationSendingIsSynchronous(properties);
}
@Test
public void testNotificationSendingIsSynchronousWhenAtlasConfIsProvidedAndSynchronousModeHasBeenSet() throws Exception {
String atlasConfDir = createAtlasConfDir();
Properties atlasConf = new Properties();
atlasConf.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
saveAtlasConf(atlasConfDir, atlasConf);
Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
properties.put(ATLAS_CONF_CREATE, "false");
testNotificationSendingIsSynchronous(properties);
}
private void testNotificationSendingIsSynchronous(Map<PropertyDescriptor, String> properties) throws Exception {
ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
testSubject.initialize(initializationContext);
testSubject.setup(configurationContext);
Configuration atlasProperties = ApplicationProperties.get();
boolean isAsync = atlasProperties.getBoolean(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE);
assertFalse(isAsync);
}
@Test(expected = ProcessException.class)
public void testThrowExceptionWhenAtlasConfIsProvidedButSynchronousModeHasNotBeenSet() throws Exception {
String atlasConfDir = createAtlasConfDir();
Properties atlasConf = new Properties();
saveAtlasConf(atlasConfDir, atlasConf);
Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
properties.put(ATLAS_CONF_CREATE, "false");
ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
testSubject.initialize(initializationContext);
testSubject.setup(configurationContext);
}
private String createAtlasConfDir() {
String atlasConfDir = "target/atlasConfDir";
File directory = new File(atlasConfDir);
if (!directory.exists()) {
directory.mkdirs();
}
return atlasConfDir;
}
private void saveAtlasConf(String atlasConfDir, Properties atlasConf) throws IOException {
FileOutputStream fos = new FileOutputStream(atlasConfDir + File.separator + ApplicationProperties.APPLICATION_PROPERTIES);
atlasConf.store(fos, "Atlas test config");
}
private Map<PropertyDescriptor, String> initReportingTaskProperties(String atlasConfDir) {
Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(ATLAS_URLS, "http://localhost:21000");
properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
properties.put(ATLAS_CONF_DIR, atlasConfDir);
properties.put(ATLAS_CONF_CREATE, "true");
properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
properties.put(ATLAS_USER, "admin");
properties.put(ATLAS_PASSWORD, "password");
properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:9092");
return properties;
}
}

View File

@ -16,3 +16,5 @@ atlas.cluster.name=AtlasCluster
# atlas.kafka.bootstrap.servers=atlas.example.com:6667
atlas.kafka.bootstrap.servers=localhost:9092
atlas.notification.hook.asynchronous=false

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
@ -21,25 +22,26 @@ broker.id=0
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads doing disk I/O
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
@ -54,7 +56,7 @@ socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
# A comma separated list of directories under which to store log files
log.dirs=target/kafka-tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
@ -66,6 +68,13 @@ num.partitions=1
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
@ -73,7 +82,7 @@ num.recovery.threads.per.data.dir=1
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
@ -90,11 +99,11 @@ num.recovery.threads.per.data.dir=1
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
@ -104,10 +113,6 @@ log.segment.bytes=1073741824
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
@ -119,3 +124,13 @@ zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

View File

@ -22,13 +22,13 @@
<version>1.11.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-atlas-bundle</artifactId>
<version>1.11.0-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<atlas.version>0.8.1</atlas.version>
<atlas.version>2.0.0</atlas.version>
</properties>
<modules>
<module>nifi-atlas-reporting-task</module>
<module>nifi-atlas-nar</module>
@ -36,95 +36,21 @@
<dependencyManagement>
<dependencies>
<dependency>
<!-- Explicitly force Netty to 3.7.1 due to CVE-2014-0193 -->
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.7.1.Final</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-atlas-reporting-task</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
<artifactId>atlas-client-v2</artifactId>
<version>${atlas.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<!--
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<!-
Exclude these Atlas typesystem to reduce dependency size.
Use atlas-intg and atlas-common instead.
->
<exclusion>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</exclusion>
-->
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId>
<version>${atlas.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-common</artifactId>
<version>${atlas.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -136,12 +62,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<!--
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
-->
</exclusions>
</dependency>
</dependencies>