mirror of https://github.com/apache/nifi.git
NIFI-4707: Improved S2SProvenanceReportingTask
- Simplified consumeEvents method signature - Refactored ComponentMapHolder methods visibility - Renamed componentMap to componentNameMap - Map more metadata from ConnectionStatus for Remote Input/Output Ports - Support Process Group hierachy filtering - Throw an exception when the reporting task fails to send provenance data to keep current provenance event index so that events can be consumed again
This commit is contained in:
parent
1f793923a4
commit
d65e6b2563
|
@ -640,7 +640,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
|
||||||
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers,
|
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers,
|
||||||
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
|
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
|
||||||
(ProvenanceRepository)eventAccess.getProvenanceRepository());
|
(ProvenanceRepository)eventAccess.getProvenanceRepository());
|
||||||
consumer.consumeEvents(context, context.getStateManager(), (componentMapHolder, events) -> {
|
consumer.consumeEvents(context, (componentMapHolder, events) -> {
|
||||||
for (ProvenanceEventRecord event : events) {
|
for (ProvenanceEventRecord event : events) {
|
||||||
try {
|
try {
|
||||||
lineageStrategy.processEvent(analysisContext, nifiFlow, event);
|
lineageStrategy.processEvent(analysisContext, nifiFlow, event);
|
||||||
|
|
|
@ -24,68 +24,95 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Stack;
|
||||||
|
|
||||||
public class ComponentMapHolder {
|
public class ComponentMapHolder {
|
||||||
final Map<String,String> componentMap = new HashMap<>();
|
private static final String REMOTE_INPUT_PORT = "Remote Input Port";
|
||||||
final Map<String,String> componentToParentGroupMap = new HashMap<>();
|
private static final String REMOTE_OUTPUT_PORT = "Remote Output Port";
|
||||||
|
private final Map<String,String> componentNameMap = new HashMap<>();
|
||||||
|
private final Map<String,String> componentToParentGroupMap = new HashMap<>();
|
||||||
|
private final Map<String,String> sourceToConnectionParentGroupMap = new HashMap<>();
|
||||||
|
private final Map<String,String> destinationToConnectionParentGroupMap = new HashMap<>();
|
||||||
|
|
||||||
public ComponentMapHolder putAll(ComponentMapHolder holder) {
|
private ComponentMapHolder putAll(ComponentMapHolder holder) {
|
||||||
this.componentMap.putAll(holder.getComponentMap());
|
this.componentNameMap.putAll(holder.componentNameMap);
|
||||||
this.componentToParentGroupMap.putAll(holder.getComponentToParentGroupMap());
|
this.componentToParentGroupMap.putAll(holder.componentToParentGroupMap);
|
||||||
|
this.sourceToConnectionParentGroupMap.putAll(holder.sourceToConnectionParentGroupMap);
|
||||||
|
this.destinationToConnectionParentGroupMap.putAll(holder.destinationToConnectionParentGroupMap);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, String> getComponentMap() {
|
|
||||||
return componentMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, String> getComponentToParentGroupMap() {
|
|
||||||
return componentToParentGroupMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getComponentName(final String componentId) {
|
public String getComponentName(final String componentId) {
|
||||||
return componentMap.get(componentId);
|
return componentNameMap.get(componentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getProcessGroupId(final String componentId) {
|
public Stack<String> getProcessGroupIdStack(final String startingProcessGroupId) {
|
||||||
|
final Stack<String> stack = new Stack<>();
|
||||||
|
String processGroupId = startingProcessGroupId;
|
||||||
|
stack.push(startingProcessGroupId);
|
||||||
|
while (componentToParentGroupMap.containsKey(processGroupId)) {
|
||||||
|
final String parentGroupId = componentToParentGroupMap.get(processGroupId);
|
||||||
|
if (parentGroupId == null || parentGroupId.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
stack.push(parentGroupId);
|
||||||
|
processGroupId = parentGroupId;
|
||||||
|
}
|
||||||
|
return stack;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getProcessGroupId(final String componentId, final String componentType) {
|
||||||
|
// Where a Remote Input/Output Port resides is only available at ConnectionStatus.
|
||||||
|
if (REMOTE_INPUT_PORT.equals(componentType)) {
|
||||||
|
return destinationToConnectionParentGroupMap.get(componentId);
|
||||||
|
} else if (REMOTE_OUTPUT_PORT.equals(componentType)) {
|
||||||
|
return sourceToConnectionParentGroupMap.get(componentId);
|
||||||
|
}
|
||||||
return componentToParentGroupMap.get(componentId);
|
return componentToParentGroupMap.get(componentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
|
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
|
||||||
final ComponentMapHolder holder = new ComponentMapHolder();
|
final ComponentMapHolder holder = new ComponentMapHolder();
|
||||||
final Map<String,String> componentMap = holder.getComponentMap();
|
final Map<String,String> componentNameMap = holder.componentNameMap;
|
||||||
final Map<String,String> componentToParentGroupMap = holder.getComponentToParentGroupMap();
|
final Map<String,String> componentToParentGroupMap = holder.componentToParentGroupMap;
|
||||||
|
final Map<String,String> sourceToConnectionParentGroupMap = holder.sourceToConnectionParentGroupMap;
|
||||||
|
final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
|
||||||
|
|
||||||
if (status != null) {
|
if (status != null) {
|
||||||
componentMap.put(status.getId(), status.getName());
|
componentNameMap.put(status.getId(), status.getName());
|
||||||
|
|
||||||
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
|
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
|
||||||
componentMap.put(procStatus.getId(), procStatus.getName());
|
componentNameMap.put(procStatus.getId(), procStatus.getName());
|
||||||
componentToParentGroupMap.put(procStatus.getId(), status.getId());
|
componentToParentGroupMap.put(procStatus.getId(), status.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final PortStatus portStatus : status.getInputPortStatus()) {
|
for (final PortStatus portStatus : status.getInputPortStatus()) {
|
||||||
componentMap.put(portStatus.getId(), portStatus.getName());
|
componentNameMap.put(portStatus.getId(), portStatus.getName());
|
||||||
componentToParentGroupMap.put(portStatus.getId(), status.getId());
|
componentToParentGroupMap.put(portStatus.getId(), status.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final PortStatus portStatus : status.getOutputPortStatus()) {
|
for (final PortStatus portStatus : status.getOutputPortStatus()) {
|
||||||
componentMap.put(portStatus.getId(), portStatus.getName());
|
componentNameMap.put(portStatus.getId(), portStatus.getName());
|
||||||
componentToParentGroupMap.put(portStatus.getId(), status.getId());
|
componentToParentGroupMap.put(portStatus.getId(), status.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
|
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
|
||||||
componentMap.put(rpgStatus.getId(), rpgStatus.getName());
|
componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
|
||||||
componentToParentGroupMap.put(rpgStatus.getId(), status.getId());
|
componentToParentGroupMap.put(rpgStatus.getId(), status.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
|
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
|
||||||
componentMap.put(connectionStatus.getId(), connectionStatus.getName());
|
componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
|
||||||
componentToParentGroupMap.put(connectionStatus.getId(), status.getId());
|
componentToParentGroupMap.put(connectionStatus.getId(), status.getId());
|
||||||
|
// Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
|
||||||
|
componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
|
||||||
|
componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
|
||||||
|
sourceToConnectionParentGroupMap.put(connectionStatus.getSourceId(), connectionStatus.getGroupId());
|
||||||
|
destinationToConnectionParentGroupMap.put(connectionStatus.getDestinationId(), connectionStatus.getGroupId());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
|
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
|
||||||
componentMap.put(childGroup.getId(), childGroup.getName());
|
componentNameMap.put(childGroup.getId(), childGroup.getName());
|
||||||
componentToParentGroupMap.put(childGroup.getId(), status.getId());
|
componentToParentGroupMap.put(childGroup.getId(), status.getId());
|
||||||
holder.putAll(createComponentMap(childGroup));
|
holder.putAll(createComponentMap(childGroup));
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,7 +113,7 @@ public class ProvenanceEventConsumer {
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void consumeEvents(final ReportingContext context, final StateManager stateManager,
|
public void consumeEvents(final ReportingContext context,
|
||||||
final BiConsumer<ComponentMapHolder, List<ProvenanceEventRecord>> consumer) throws ProcessException {
|
final BiConsumer<ComponentMapHolder, List<ProvenanceEventRecord>> consumer) throws ProcessException {
|
||||||
|
|
||||||
if (context == null) {
|
if (context == null) {
|
||||||
|
@ -123,6 +123,7 @@ public class ProvenanceEventConsumer {
|
||||||
final EventAccess eventAccess = context.getEventAccess();
|
final EventAccess eventAccess = context.getEventAccess();
|
||||||
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
|
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
|
||||||
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
|
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
|
||||||
|
final StateManager stateManager = context.getStateManager();
|
||||||
|
|
||||||
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
|
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
|
||||||
|
|
||||||
|
@ -234,12 +235,16 @@ public class ProvenanceEventConsumer {
|
||||||
|
|
||||||
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
|
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
|
||||||
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
|
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
|
||||||
// If we aren't filtering it out based on component ID, let's see if this component has a parent process group ID
|
// If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
|
||||||
// that is being filtered on
|
// that is being filtered on
|
||||||
if (componentMapHolder == null || componentMapHolder.getComponentToParentGroupMap().isEmpty()) {
|
if (componentMapHolder == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!componentIds.contains(componentMapHolder.getComponentToParentGroupMap().get(provenanceEventRecord.getComponentId()))) {
|
final String processGroupId = componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), provenanceEventRecord.getComponentType());
|
||||||
|
if (processGroupId == null || processGroupId.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (componentMapHolder.getProcessGroupIdStack(processGroupId).stream().noneMatch(pgid -> componentIds.contains(pgid))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,14 +203,14 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
||||||
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
|
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
|
||||||
df.setTimeZone(TimeZone.getTimeZone("Z"));
|
df.setTimeZone(TimeZone.getTimeZone("Z"));
|
||||||
|
|
||||||
consumer.consumeEvents(context, context.getStateManager(), (mapHolder, events) -> {
|
consumer.consumeEvents(context, (mapHolder, events) -> {
|
||||||
final long start = System.nanoTime();
|
final long start = System.nanoTime();
|
||||||
// Create a JSON array of all the events in the current batch
|
// Create a JSON array of all the events in the current batch
|
||||||
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
|
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
|
||||||
for (final ProvenanceEventRecord event : events) {
|
for (final ProvenanceEventRecord event : events) {
|
||||||
final String componentName = mapHolder.getComponentName(event.getComponentId());
|
final String componentName = mapHolder.getComponentName(event.getComponentId());
|
||||||
final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId());
|
final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(), event.getComponentType());
|
||||||
final String processGroupName = mapHolder.getComponentMap().get(processGroupId);
|
final String processGroupName = mapHolder.getComponentName(processGroupId);
|
||||||
arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId));
|
arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId));
|
||||||
}
|
}
|
||||||
final JsonArray jsonArray = arrayBuilder.build();
|
final JsonArray jsonArray = arrayBuilder.build();
|
||||||
|
@ -219,8 +219,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
||||||
try {
|
try {
|
||||||
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
|
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
|
||||||
if (transaction == null) {
|
if (transaction == null) {
|
||||||
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
|
// Throw an exception to avoid provenance event id will not proceed so that those can be consumed again.
|
||||||
return;
|
throw new ProcessException("All destination nodes are penalized; will attempt to send data later");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
|
Loading…
Reference in New Issue