NIFI-4707: Build full component map for ID -> Name association in provenance reporting"

NIFI-4707: Add process group ID/name to S2SProvReportingTask records

NIFI-4707: Added support for filtering provenance on process group ID

NIFI-4707: Fixed support for provenance in Atlas reporting task

NIFI-4707: Refactored common code into reporting-utils, fixed filtering
This commit is contained in:
Matthew Burgess 2017-12-18 13:44:21 -05:00
parent b7c9c88f9f
commit 1f793923a4
4 changed files with 144 additions and 54 deletions

View File

@ -640,7 +640,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers,
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
(ProvenanceRepository)eventAccess.getProvenanceRepository());
consumer.consumeEvents(eventAccess, context.getStateManager(), events -> {
consumer.consumeEvents(context, context.getStateManager(), (componentMapHolder, events) -> {
for (ProvenanceEventRecord event : events) {
try {
lineageStrategy.processEvent(analysisContext, nifiFlow, event);

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.util.provenance;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import java.util.HashMap;
import java.util.Map;
public class ComponentMapHolder {
final Map<String,String> componentMap = new HashMap<>();
final Map<String,String> componentToParentGroupMap = new HashMap<>();
public ComponentMapHolder putAll(ComponentMapHolder holder) {
this.componentMap.putAll(holder.getComponentMap());
this.componentToParentGroupMap.putAll(holder.getComponentToParentGroupMap());
return this;
}
public Map<String, String> getComponentMap() {
return componentMap;
}
public Map<String, String> getComponentToParentGroupMap() {
return componentToParentGroupMap;
}
public String getComponentName(final String componentId) {
return componentMap.get(componentId);
}
public String getProcessGroupId(final String componentId) {
return componentToParentGroupMap.get(componentId);
}
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
final ComponentMapHolder holder = new ComponentMapHolder();
final Map<String,String> componentMap = holder.getComponentMap();
final Map<String,String> componentToParentGroupMap = holder.getComponentToParentGroupMap();
if (status != null) {
componentMap.put(status.getId(), status.getName());
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
componentMap.put(procStatus.getId(), procStatus.getName());
componentToParentGroupMap.put(procStatus.getId(), status.getId());
}
for (final PortStatus portStatus : status.getInputPortStatus()) {
componentMap.put(portStatus.getId(), portStatus.getName());
componentToParentGroupMap.put(portStatus.getId(), status.getId());
}
for (final PortStatus portStatus : status.getOutputPortStatus()) {
componentMap.put(portStatus.getId(), portStatus.getName());
componentToParentGroupMap.put(portStatus.getId(), status.getId());
}
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
componentMap.put(rpgStatus.getId(), rpgStatus.getName());
componentToParentGroupMap.put(rpgStatus.getId(), status.getId());
}
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
componentMap.put(connectionStatus.getId(), connectionStatus.getName());
componentToParentGroupMap.put(connectionStatus.getId(), status.getId());
}
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
componentMap.put(childGroup.getId(), childGroup.getName());
componentToParentGroupMap.put(childGroup.getId(), status.getId());
holder.putAll(createComponentMap(childGroup));
}
}
return holder;
}
}

View File

@ -21,19 +21,21 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
public class ProvenanceEventConsumer {
@ -111,8 +113,16 @@ public class ProvenanceEventConsumer {
this.logger = logger;
}
public void consumeEvents(final EventAccess eventAccess, final StateManager stateManager,
final Consumer<List<ProvenanceEventRecord>> consumer) throws ProcessException {
public void consumeEvents(final ReportingContext context, final StateManager stateManager,
final BiConsumer<ComponentMapHolder, List<ProvenanceEventRecord>> consumer) throws ProcessException {
if (context == null) {
logger.debug("No ReportingContext available.");
return;
}
final EventAccess eventAccess = context.getEventAccess();
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
@ -160,7 +170,7 @@ public class ProvenanceEventConsumer {
List<ProvenanceEventRecord> filteredEvents;
try {
rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
filteredEvents = filterEvents(rawEvents);
filteredEvents = filterEvents(componentMapHolder, rawEvents);
} catch (final IOException ioe) {
logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
return;
@ -176,7 +186,7 @@ public class ProvenanceEventConsumer {
if (!filteredEvents.isEmpty()) {
// Executes callback.
consumer.accept(filteredEvents);
consumer.accept(componentMapHolder, filteredEvents);
}
firstEventId = updateLastEventId(rawEvents, stateManager);
@ -184,7 +194,7 @@ public class ProvenanceEventConsumer {
// Retrieve the next batch
try {
rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
filteredEvents = filterEvents(rawEvents);
filteredEvents = filterEvents(componentMapHolder, rawEvents);
} catch (final IOException ioe) {
logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
return;
@ -218,13 +228,20 @@ public class ProvenanceEventConsumer {
return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
}
private List<ProvenanceEventRecord> filterEvents(List<ProvenanceEventRecord> provenanceEvents) {
private List<ProvenanceEventRecord> filterEvents(ComponentMapHolder componentMapHolder, List<ProvenanceEventRecord> provenanceEvents) {
if(isFilteringEnabled()) {
List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
continue;
// If we aren't filtering it out based on component ID, let's see if this component has a parent process group ID
// that is being filtered on
if (componentMapHolder == null || componentMapHolder.getComponentToParentGroupMap().isEmpty()) {
continue;
}
if (!componentIds.contains(componentMapHolder.getComponentToParentGroupMap().get(provenanceEventRecord.getComponentId()))) {
continue;
}
}
if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
continue;

View File

@ -28,10 +28,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -46,6 +43,7 @@ import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@ -174,36 +172,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
return properties;
}
private Map<String,String> createComponentMap(final ProcessGroupStatus status) {
final Map<String,String> componentMap = new HashMap<>();
if (status != null) {
componentMap.put(status.getId(), status.getName());
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
componentMap.put(procStatus.getId(), procStatus.getName());
}
for (final PortStatus portStatus : status.getInputPortStatus()) {
componentMap.put(portStatus.getId(), portStatus.getName());
}
for (final PortStatus portStatus : status.getOutputPortStatus()) {
componentMap.put(portStatus.getId(), portStatus.getName());
}
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
componentMap.put(rpgStatus.getId(), rpgStatus.getName());
}
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
componentMap.put(childGroup.getId(), childGroup.getName());
}
}
return componentMap;
}
@Override
public void onTrigger(final ReportingContext context) {
final boolean isClustered = context.isClustered();
@ -216,8 +184,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
final Map<String,String> componentMap = createComponentMap(procGroupStatus);
final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
URL url;
try {
@ -237,13 +203,15 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
df.setTimeZone(TimeZone.getTimeZone("Z"));
consumer.consumeEvents(context.getEventAccess(), context.getStateManager(), events -> {
consumer.consumeEvents(context, context.getStateManager(), (mapHolder, events) -> {
final long start = System.nanoTime();
// Create a JSON array of all the events in the current batch
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
for (final ProvenanceEventRecord event : events) {
final String componentName = componentMap.get(event.getComponentId());
arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId));
final String componentName = mapHolder.getComponentName(event.getComponentId());
final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId());
final String processGroupName = mapHolder.getComponentMap().get(processGroupId);
arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId));
}
final JsonArray jsonArray = arrayBuilder.build();
@ -277,7 +245,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform, final String nodeIdentifier) {
final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName,
final String platform, final String nodeIdentifier) {
addField(builder, "eventId", UUID.randomUUID().toString());
addField(builder, "eventOrdinal", event.getEventId());
addField(builder, "eventType", event.getEventType().name());
@ -289,6 +258,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
addField(builder, "componentId", event.getComponentId());
addField(builder, "componentType", event.getComponentType());
addField(builder, "componentName", componentName);
addField(builder, "processGroupId", processGroupId, true);
addField(builder, "processGroupName", processGroupName, true);
addField(builder, "entityId", event.getFlowFileUuid());
addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile");
addField(builder, "entitySize", event.getFileSize());
@ -352,11 +323,17 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
}
private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
if (value == null) {
return;
}
addField(builder, key, value, false);
}
builder.add(key, value);
private static void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
if (value == null) {
if (allowNullValues) {
builder.add(key, JsonValue.NULL);
}
} else {
builder.add(key, value);
}
}
private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
@ -368,5 +345,4 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
}
return builder;
}
}