From d65e6b25630fa918ede2cd6922dc777e816679c3 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 20 Dec 2017 15:51:58 +0900 Subject: [PATCH] NIFI-4707: Improved S2SProvenanceReportingTask - Simplified consumeEvents method signature - Refactored ComponentMapHolder methods visibility - Renamed componentMap to componentNameMap - Map more metadata from ConnectionStatus for Remote Input/Output Ports - Support Process Group hierachy filtering - Throw an exception when the reporting task fails to send provenance data to keep current provenance event index so that events can be consumed again --- .../atlas/reporting/ReportLineageToAtlas.java | 2 +- .../util/provenance/ComponentMapHolder.java | 75 +++++++++++++------ .../provenance/ProvenanceEventConsumer.java | 13 +++- .../SiteToSiteProvenanceReportingTask.java | 10 +-- 4 files changed, 66 insertions(+), 34 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index f722b9d53a..5bb60244f7 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -640,7 +640,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers, // FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update. (ProvenanceRepository)eventAccess.getProvenanceRepository()); - consumer.consumeEvents(context, context.getStateManager(), (componentMapHolder, events) -> { + consumer.consumeEvents(context, (componentMapHolder, events) -> { for (ProvenanceEventRecord event : events) { try { lineageStrategy.processEvent(analysisContext, nifiFlow, event); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java index 495968ade9..342b5a27b0 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java @@ -24,68 +24,95 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import java.util.HashMap; import java.util.Map; +import java.util.Stack; public class ComponentMapHolder { - final Map componentMap = new HashMap<>(); - final Map componentToParentGroupMap = new HashMap<>(); + private static final String REMOTE_INPUT_PORT = "Remote Input Port"; + private static final String REMOTE_OUTPUT_PORT = "Remote Output Port"; + private final Map componentNameMap = new HashMap<>(); + private final Map componentToParentGroupMap = new HashMap<>(); + private final Map sourceToConnectionParentGroupMap = new HashMap<>(); + private final Map destinationToConnectionParentGroupMap = new HashMap<>(); - public ComponentMapHolder putAll(ComponentMapHolder holder) { - this.componentMap.putAll(holder.getComponentMap()); - this.componentToParentGroupMap.putAll(holder.getComponentToParentGroupMap()); + private ComponentMapHolder putAll(ComponentMapHolder holder) { + this.componentNameMap.putAll(holder.componentNameMap); + this.componentToParentGroupMap.putAll(holder.componentToParentGroupMap); + this.sourceToConnectionParentGroupMap.putAll(holder.sourceToConnectionParentGroupMap); + this.destinationToConnectionParentGroupMap.putAll(holder.destinationToConnectionParentGroupMap); return this; } - public Map getComponentMap() { - return componentMap; - } - - public Map getComponentToParentGroupMap() { - return componentToParentGroupMap; - } - public String getComponentName(final String componentId) { - return componentMap.get(componentId); + return componentNameMap.get(componentId); } - public String getProcessGroupId(final String componentId) { + public Stack getProcessGroupIdStack(final String startingProcessGroupId) { + final Stack stack = new Stack<>(); + String processGroupId = startingProcessGroupId; + stack.push(startingProcessGroupId); + while (componentToParentGroupMap.containsKey(processGroupId)) { + final String parentGroupId = componentToParentGroupMap.get(processGroupId); + if (parentGroupId == null || parentGroupId.isEmpty()) { + break; + } + stack.push(parentGroupId); + processGroupId = parentGroupId; + } + return stack; + } + + public String getProcessGroupId(final String componentId, final String componentType) { + // Where a Remote Input/Output Port resides is only available at ConnectionStatus. + if (REMOTE_INPUT_PORT.equals(componentType)) { + return destinationToConnectionParentGroupMap.get(componentId); + } else if (REMOTE_OUTPUT_PORT.equals(componentType)) { + return sourceToConnectionParentGroupMap.get(componentId); + } return componentToParentGroupMap.get(componentId); } public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) { final ComponentMapHolder holder = new ComponentMapHolder(); - final Map componentMap = holder.getComponentMap(); - final Map componentToParentGroupMap = holder.getComponentToParentGroupMap(); + final Map componentNameMap = holder.componentNameMap; + final Map componentToParentGroupMap = holder.componentToParentGroupMap; + final Map sourceToConnectionParentGroupMap = holder.sourceToConnectionParentGroupMap; + final Map destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap; if (status != null) { - componentMap.put(status.getId(), status.getName()); + componentNameMap.put(status.getId(), status.getName()); for (final ProcessorStatus procStatus : status.getProcessorStatus()) { - componentMap.put(procStatus.getId(), procStatus.getName()); + componentNameMap.put(procStatus.getId(), procStatus.getName()); componentToParentGroupMap.put(procStatus.getId(), status.getId()); } for (final PortStatus portStatus : status.getInputPortStatus()) { - componentMap.put(portStatus.getId(), portStatus.getName()); + componentNameMap.put(portStatus.getId(), portStatus.getName()); componentToParentGroupMap.put(portStatus.getId(), status.getId()); } for (final PortStatus portStatus : status.getOutputPortStatus()) { - componentMap.put(portStatus.getId(), portStatus.getName()); + componentNameMap.put(portStatus.getId(), portStatus.getName()); componentToParentGroupMap.put(portStatus.getId(), status.getId()); } for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { - componentMap.put(rpgStatus.getId(), rpgStatus.getName()); + componentNameMap.put(rpgStatus.getId(), rpgStatus.getName()); componentToParentGroupMap.put(rpgStatus.getId(), status.getId()); } for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) { - componentMap.put(connectionStatus.getId(), connectionStatus.getName()); + componentNameMap.put(connectionStatus.getId(), connectionStatus.getName()); componentToParentGroupMap.put(connectionStatus.getId(), status.getId()); + // Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus. + componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName()); + componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName()); + sourceToConnectionParentGroupMap.put(connectionStatus.getSourceId(), connectionStatus.getGroupId()); + destinationToConnectionParentGroupMap.put(connectionStatus.getDestinationId(), connectionStatus.getGroupId()); } for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { - componentMap.put(childGroup.getId(), childGroup.getName()); + componentNameMap.put(childGroup.getId(), childGroup.getName()); componentToParentGroupMap.put(childGroup.getId(), status.getId()); holder.putAll(createComponentMap(childGroup)); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java index 825662693d..75c1e6037a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java @@ -113,7 +113,7 @@ public class ProvenanceEventConsumer { this.logger = logger; } - public void consumeEvents(final ReportingContext context, final StateManager stateManager, + public void consumeEvents(final ReportingContext context, final BiConsumer> consumer) throws ProcessException { if (context == null) { @@ -123,6 +123,7 @@ public class ProvenanceEventConsumer { final EventAccess eventAccess = context.getEventAccess(); final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus(); final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus); + final StateManager stateManager = context.getStateManager(); Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId(); @@ -234,12 +235,16 @@ public class ProvenanceEventConsumer { for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) { if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) { - // If we aren't filtering it out based on component ID, let's see if this component has a parent process group ID + // If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs // that is being filtered on - if (componentMapHolder == null || componentMapHolder.getComponentToParentGroupMap().isEmpty()) { + if (componentMapHolder == null) { continue; } - if (!componentIds.contains(componentMapHolder.getComponentToParentGroupMap().get(provenanceEventRecord.getComponentId()))) { + final String processGroupId = componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), provenanceEventRecord.getComponentType()); + if (processGroupId == null || processGroupId.isEmpty()) { + continue; + } + if (componentMapHolder.getProcessGroupIdStack(processGroupId).stream().noneMatch(pgid -> componentIds.contains(pgid))) { continue; } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index c99e9d8ca1..61c8bc4219 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -203,14 +203,14 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); df.setTimeZone(TimeZone.getTimeZone("Z")); - consumer.consumeEvents(context, context.getStateManager(), (mapHolder, events) -> { + consumer.consumeEvents(context, (mapHolder, events) -> { final long start = System.nanoTime(); // Create a JSON array of all the events in the current batch final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); for (final ProvenanceEventRecord event : events) { final String componentName = mapHolder.getComponentName(event.getComponentId()); - final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId()); - final String processGroupName = mapHolder.getComponentMap().get(processGroupId); + final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(), event.getComponentType()); + final String processGroupName = mapHolder.getComponentName(processGroupId); arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId)); } final JsonArray jsonArray = arrayBuilder.build(); @@ -219,8 +219,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti try { final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { - getLogger().debug("All destination nodes are penalized; will attempt to send data later"); - return; + // Throw an exception to avoid provenance event id will not proceed so that those can be consumed again. + throw new ProcessException("All destination nodes are penalized; will attempt to send data later"); } final Map attributes = new HashMap<>();