diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 4c78ef7214..f722b9d53a 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -640,7 +640,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers, // FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update. (ProvenanceRepository)eventAccess.getProvenanceRepository()); - consumer.consumeEvents(eventAccess, context.getStateManager(), events -> { + consumer.consumeEvents(context, context.getStateManager(), (componentMapHolder, events) -> { for (ProvenanceEventRecord event : events) { try { lineageStrategy.processEvent(analysisContext, nifiFlow, event); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java new file mode 100644 index 0000000000..495968ade9 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.provenance; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; + +import java.util.HashMap; +import java.util.Map; + +public class ComponentMapHolder { + final Map componentMap = new HashMap<>(); + final Map componentToParentGroupMap = new HashMap<>(); + + public ComponentMapHolder putAll(ComponentMapHolder holder) { + this.componentMap.putAll(holder.getComponentMap()); + this.componentToParentGroupMap.putAll(holder.getComponentToParentGroupMap()); + return this; + } + + public Map getComponentMap() { + return componentMap; + } + + public Map getComponentToParentGroupMap() { + return componentToParentGroupMap; + } + + public String getComponentName(final String componentId) { + return componentMap.get(componentId); + } + + public String getProcessGroupId(final String componentId) { + return componentToParentGroupMap.get(componentId); + } + + public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) { + final ComponentMapHolder holder = new ComponentMapHolder(); + final Map componentMap = holder.getComponentMap(); + final Map componentToParentGroupMap = holder.getComponentToParentGroupMap(); + + if (status != null) { + componentMap.put(status.getId(), status.getName()); + + for (final ProcessorStatus procStatus : status.getProcessorStatus()) { + componentMap.put(procStatus.getId(), procStatus.getName()); + componentToParentGroupMap.put(procStatus.getId(), status.getId()); + } + + for (final PortStatus portStatus : status.getInputPortStatus()) { + componentMap.put(portStatus.getId(), portStatus.getName()); + componentToParentGroupMap.put(portStatus.getId(), status.getId()); + } + + for (final PortStatus portStatus : status.getOutputPortStatus()) { + componentMap.put(portStatus.getId(), portStatus.getName()); + componentToParentGroupMap.put(portStatus.getId(), status.getId()); + } + + for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { + componentMap.put(rpgStatus.getId(), rpgStatus.getName()); + componentToParentGroupMap.put(rpgStatus.getId(), status.getId()); + } + + for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) { + componentMap.put(connectionStatus.getId(), connectionStatus.getName()); + componentToParentGroupMap.put(connectionStatus.getId(), status.getId()); + } + + for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { + componentMap.put(childGroup.getId(), childGroup.getName()); + componentToParentGroupMap.put(childGroup.getId(), status.getId()); + holder.putAll(createComponentMap(childGroup)); + } + } + + return holder; + } + +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java index 1cbdbf175b..825662693d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java @@ -21,19 +21,21 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.regex.Pattern; public class ProvenanceEventConsumer { @@ -111,8 +113,16 @@ public class ProvenanceEventConsumer { this.logger = logger; } - public void consumeEvents(final EventAccess eventAccess, final StateManager stateManager, - final Consumer> consumer) throws ProcessException { + public void consumeEvents(final ReportingContext context, final StateManager stateManager, + final BiConsumer> consumer) throws ProcessException { + + if (context == null) { + logger.debug("No ReportingContext available."); + return; + } + final EventAccess eventAccess = context.getEventAccess(); + final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus(); + final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus); Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId(); @@ -160,7 +170,7 @@ public class ProvenanceEventConsumer { List filteredEvents; try { rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize); - filteredEvents = filterEvents(rawEvents); + filteredEvents = filterEvents(componentMapHolder, rawEvents); } catch (final IOException ioe) { logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe); return; @@ -176,7 +186,7 @@ public class ProvenanceEventConsumer { if (!filteredEvents.isEmpty()) { // Executes callback. - consumer.accept(filteredEvents); + consumer.accept(componentMapHolder, filteredEvents); } firstEventId = updateLastEventId(rawEvents, stateManager); @@ -184,7 +194,7 @@ public class ProvenanceEventConsumer { // Retrieve the next batch try { rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize); - filteredEvents = filterEvents(rawEvents); + filteredEvents = filterEvents(componentMapHolder, rawEvents); } catch (final IOException ioe) { logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe); return; @@ -218,13 +228,20 @@ public class ProvenanceEventConsumer { return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty(); } - private List filterEvents(List provenanceEvents) { + private List filterEvents(ComponentMapHolder componentMapHolder, List provenanceEvents) { if(isFilteringEnabled()) { - List filteredEvents = new ArrayList(); + List filteredEvents = new ArrayList<>(); for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) { if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) { - continue; + // If we aren't filtering it out based on component ID, let's see if this component has a parent process group ID + // that is being filtered on + if (componentMapHolder == null || componentMapHolder.getComponentToParentGroupMap().isEmpty()) { + continue; + } + if (!componentIds.contains(componentMapHolder.getComponentToParentGroupMap().get(provenanceEventRecord.getComponentId()))) { + continue; + } } if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) { continue; diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index 8b8048b295..c99e9d8ca1 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -28,10 +28,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -46,6 +43,7 @@ import javax.json.JsonArrayBuilder; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -174,36 +172,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti return properties; } - private Map createComponentMap(final ProcessGroupStatus status) { - final Map componentMap = new HashMap<>(); - - if (status != null) { - componentMap.put(status.getId(), status.getName()); - - for (final ProcessorStatus procStatus : status.getProcessorStatus()) { - componentMap.put(procStatus.getId(), procStatus.getName()); - } - - for (final PortStatus portStatus : status.getInputPortStatus()) { - componentMap.put(portStatus.getId(), portStatus.getName()); - } - - for (final PortStatus portStatus : status.getOutputPortStatus()) { - componentMap.put(portStatus.getId(), portStatus.getName()); - } - - for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { - componentMap.put(rpgStatus.getId(), rpgStatus.getName()); - } - - for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { - componentMap.put(childGroup.getId(), childGroup.getName()); - } - } - - return componentMap; - } - @Override public void onTrigger(final ReportingContext context) { final boolean isClustered = context.isClustered(); @@ -216,8 +184,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); - final Map componentMap = createComponentMap(procGroupStatus); - final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue(); URL url; try { @@ -237,13 +203,15 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); df.setTimeZone(TimeZone.getTimeZone("Z")); - consumer.consumeEvents(context.getEventAccess(), context.getStateManager(), events -> { + consumer.consumeEvents(context, context.getStateManager(), (mapHolder, events) -> { final long start = System.nanoTime(); // Create a JSON array of all the events in the current batch final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); for (final ProvenanceEventRecord event : events) { - final String componentName = componentMap.get(event.getComponentId()); - arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId)); + final String componentName = mapHolder.getComponentName(event.getComponentId()); + final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId()); + final String processGroupName = mapHolder.getComponentMap().get(processGroupId); + arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId)); } final JsonArray jsonArray = arrayBuilder.build(); @@ -277,7 +245,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, - final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform, final String nodeIdentifier) { + final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName, + final String platform, final String nodeIdentifier) { addField(builder, "eventId", UUID.randomUUID().toString()); addField(builder, "eventOrdinal", event.getEventId()); addField(builder, "eventType", event.getEventType().name()); @@ -289,6 +258,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti addField(builder, "componentId", event.getComponentId()); addField(builder, "componentType", event.getComponentType()); addField(builder, "componentName", componentName); + addField(builder, "processGroupId", processGroupId, true); + addField(builder, "processGroupName", processGroupName, true); addField(builder, "entityId", event.getFlowFileUuid()); addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile"); addField(builder, "entitySize", event.getFileSize()); @@ -352,11 +323,17 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti } private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - if (value == null) { - return; - } + addField(builder, key, value, false); + } - builder.add(key, value); + private static void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) { + if (value == null) { + if (allowNullValues) { + builder.add(key, JsonValue.NULL); + } + } else { + builder.add(key, value); + } } private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection values) { @@ -368,5 +345,4 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti } return builder; } - }