diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java index 18a8afe393..34734754fe 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java @@ -32,6 +32,7 @@ import org.apache.nifi.reporting.ReportingContext; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,8 +67,11 @@ public class ProvenanceEventConsumer { private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue(); private Pattern componentTypeRegex; - private List eventTypes = new ArrayList(); - private List componentIds = new ArrayList(); + private Pattern componentTypeRegexExclude; + private List eventTypes = new ArrayList<>(); + private List eventTypesExclude = new ArrayList<>(); + private List componentIds = new ArrayList<>(); + private List componentIdsExclude = new ArrayList<>(); private int batchSize = Integer.parseInt(PROVENANCE_BATCH_SIZE.getDefaultValue()); private volatile long firstEventId = -1L; @@ -89,16 +93,26 @@ public class ProvenanceEventConsumer { } } - public void addTargetEventType(final ProvenanceEventType... types) { - for (ProvenanceEventType type : types) { - eventTypes.add(type); + public void setComponentTypeRegexExclude(final String componentTypeRegex) { + if (!StringUtils.isBlank(componentTypeRegex)) { + this.componentTypeRegexExclude = Pattern.compile(componentTypeRegex); } } + public void addTargetEventType(final ProvenanceEventType... types) { + Collections.addAll(eventTypes, types); + } + + public void addTargetEventTypeExclude(final ProvenanceEventType... types) { + Collections.addAll(eventTypesExclude, types); + } + public void addTargetComponentId(final String... ids) { - for (String id : ids) { - componentIds.add(id); - } + Collections.addAll(componentIds, ids); + } + + public void addTargetComponentIdExclude(final String... ids) { + Collections.addAll(componentIdsExclude, ids); } public void setScheduled(boolean scheduled) { @@ -226,7 +240,8 @@ public class ProvenanceEventConsumer { private boolean isFilteringEnabled() { - return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty(); + return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty() + || componentTypeRegexExclude != null || !eventTypesExclude.isEmpty() || !componentIdsExclude.isEmpty(); } private List filterEvents(ComponentMapHolder componentMapHolder, List provenanceEvents) { @@ -234,7 +249,38 @@ public class ProvenanceEventConsumer { List filteredEvents = new ArrayList<>(); for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) { + if (!eventTypesExclude.isEmpty() && eventTypesExclude.contains(provenanceEventRecord.getEventType())) { + continue; + } + if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) { + continue; + } final String componentId = provenanceEventRecord.getComponentId(); + if (!componentIdsExclude.isEmpty()) { + if (componentIdsExclude.contains(componentId)) { + continue; + } + // If we aren't excluding it based on component ID, let's see if this component has a parent process group IDs + // that is being excluded + if (componentMapHolder == null) { + continue; + } + final String processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType()); + if (!StringUtils.isEmpty(processGroupId)) { + + // Check if the process group or any parent process group is specified as a target component ID. + if (componentIdsExclude.contains(processGroupId)) { + continue; + } + ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId); + while (parentProcessGroup != null && !componentIdsExclude.contains(parentProcessGroup.getId())) { + parentProcessGroup = parentProcessGroup.getParent(); + } + if (parentProcessGroup != null) { + continue; + } + } + } if (!componentIds.isEmpty() && !componentIds.contains(componentId)) { // If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs // that is being filtered on @@ -245,7 +291,6 @@ public class ProvenanceEventConsumer { if (StringUtils.isEmpty(processGroupId)) { continue; } - // Check if the process group or any parent process group is specified as a target component ID. if (!componentIds.contains(processGroupId)) { ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId); while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) { @@ -256,7 +301,8 @@ public class ProvenanceEventConsumer { } } } - if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) { + + if (componentTypeRegexExclude != null && componentTypeRegexExclude.matcher(provenanceEventRecord.getComponentType()).matches()) { continue; } if (componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index 61c8bc4219..8331b46929 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -87,7 +87,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder() .name("s2s-prov-task-event-filter") - .displayName("Event Type") + .displayName("Event Type to Include") .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. " + "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If " + "multiple filters are set, the filters are cumulative.") @@ -95,24 +95,55 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder() + .name("s2s-prov-task-event-filter-exclude") + .displayName("Event Type to Exclude") + .description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. " + + "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If " + + "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the " + + "exclusion takes precedence and the event will not be sent.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder() .name("s2s-prov-task-type-filter") - .displayName("Component Type") + .displayName("Component Type to Include") .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular " + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") .required(false) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); + static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder() + .name("s2s-prov-task-type-filter-exclude") + .displayName("Component Type to Exclude") + .description("Regular expression to exclude the provenance events based on the component type. The events matching the regular " + + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. " + + "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder() .name("s2s-prov-task-id-filter") - .displayName("Component ID") + .displayName("Component ID to Include") .description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no " + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder() + .name("s2s-prov-task-id-filter-exclude") + .displayName("Component ID to Exclude") + .description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no " + + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in " + + "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder() .name("start-position") .displayName("Start Position") @@ -133,6 +164,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti // initialize component type filtering consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue()); + consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).getValue()); final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ',')); if(targetEventTypes != null) { @@ -145,12 +177,28 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti } } + final String[] targetEventTypesExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE).getValue(), ',')); + if(targetEventTypesExclude != null) { + for(String type : targetEventTypesExclude) { + try { + consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type)); + } catch (Exception e) { + getLogger().warn(type + " is not a correct event type, removed from the exclude filtering."); + } + } + } + // initialize component ID filtering final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ',')); if(targetComponentIds != null) { consumer.addTargetComponentId(targetComponentIds); } + final String[] targetComponentIdsExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE).getValue(), ',')); + if(targetComponentIdsExclude != null) { + consumer.addTargetComponentIdExclude(targetComponentIdsExclude); + } + consumer.setScheduled(true); } @@ -166,8 +214,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(PLATFORM); properties.add(FILTER_EVENT_TYPE); + properties.add(FILTER_EVENT_TYPE_EXCLUDE); properties.add(FILTER_COMPONENT_TYPE); + properties.add(FILTER_COMPONENT_TYPE_EXCLUDE); properties.add(FILTER_COMPONENT_ID); + properties.add(FILTER_COMPONENT_ID_EXCLUDE); properties.add(START_POSITION); return properties; } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index 26d5877d7e..31054c2471 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -272,6 +272,25 @@ public class TestSiteToSiteProvenanceReportingTask { assertEquals(3, task.dataSent.size()); } + @Test + public void testFilterComponentTypeExcludeSuccess() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*"); + + ProvenanceEventRecord event = createProvenanceEventRecord(); + + MockSiteToSiteProvenanceReportingTask task = setup(event, properties); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(0, task.dataSent.size()); + } + @Test public void testFilterComponentTypeNoResult() throws IOException, InitializationException { final Map properties = new HashMap<>(); @@ -291,6 +310,25 @@ public class TestSiteToSiteProvenanceReportingTask { assertEquals(0, task.dataSent.size()); } + @Test + public void testFilterComponentTypeNoResultExcluded() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "proc.*"); + + ProvenanceEventRecord event = createProvenanceEventRecord(); + + MockSiteToSiteProvenanceReportingTask task = setup(event, properties); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(3, task.dataSent.size()); + } + @Test public void testFilterEventTypeSuccess() throws IOException, InitializationException { final Map properties = new HashMap<>(); @@ -310,6 +348,25 @@ public class TestSiteToSiteProvenanceReportingTask { assertEquals(3, task.dataSent.size()); } + @Test + public void testFilterEventTypeExcludeSuccess() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE_EXCLUDE, "RECEIVE, notExistingType, DROP"); + + ProvenanceEventRecord event = createProvenanceEventRecord(); + + MockSiteToSiteProvenanceReportingTask task = setup(event, properties); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(0, task.dataSent.size()); + } + @Test public void testFilterEventTypeNoResult() throws IOException, InitializationException { final Map properties = new HashMap<>(); @@ -371,6 +428,26 @@ public class TestSiteToSiteProvenanceReportingTask { assertEquals(3, task.dataSent.size()); } + @Test + public void testFilterMultiFilterExcludeTakesPrecedence() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE"); + + ProvenanceEventRecord event = createProvenanceEventRecord(); + + MockSiteToSiteProvenanceReportingTask task = setup(event, properties); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(0, task.dataSent.size()); + } + @Test public void testFilterProcessGroupId() throws IOException, InitializationException { final Map properties = new HashMap<>();