NIFI-4768: Add exclusion filters to S2SProvenanceReportingTask

NIFI-4768: Updated exclusion logic per review comments

This closes #2397.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matthew Burgess 2018-01-11 15:00:03 -05:00 committed by Koji Kawamura
parent 5e3867011e
commit 83d2930095
3 changed files with 188 additions and 14 deletions

View File

@ -32,6 +32,7 @@ import org.apache.nifi.reporting.ReportingContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -66,8 +67,11 @@ public class ProvenanceEventConsumer {
private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue();
private Pattern componentTypeRegex;
private List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
private List<String> componentIds = new ArrayList<String>();
private Pattern componentTypeRegexExclude;
private List<ProvenanceEventType> eventTypes = new ArrayList<>();
private List<ProvenanceEventType> eventTypesExclude = new ArrayList<>();
private List<String> componentIds = new ArrayList<>();
private List<String> componentIdsExclude = new ArrayList<>();
private int batchSize = Integer.parseInt(PROVENANCE_BATCH_SIZE.getDefaultValue());
private volatile long firstEventId = -1L;
@ -89,16 +93,26 @@ public class ProvenanceEventConsumer {
}
}
public void addTargetEventType(final ProvenanceEventType... types) {
for (ProvenanceEventType type : types) {
eventTypes.add(type);
public void setComponentTypeRegexExclude(final String componentTypeRegex) {
if (!StringUtils.isBlank(componentTypeRegex)) {
this.componentTypeRegexExclude = Pattern.compile(componentTypeRegex);
}
}
public void addTargetEventType(final ProvenanceEventType... types) {
Collections.addAll(eventTypes, types);
}
public void addTargetEventTypeExclude(final ProvenanceEventType... types) {
Collections.addAll(eventTypesExclude, types);
}
public void addTargetComponentId(final String... ids) {
for (String id : ids) {
componentIds.add(id);
}
Collections.addAll(componentIds, ids);
}
public void addTargetComponentIdExclude(final String... ids) {
Collections.addAll(componentIdsExclude, ids);
}
public void setScheduled(boolean scheduled) {
@ -226,7 +240,8 @@ public class ProvenanceEventConsumer {
private boolean isFilteringEnabled() {
return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty()
|| componentTypeRegexExclude != null || !eventTypesExclude.isEmpty() || !componentIdsExclude.isEmpty();
}
private List<ProvenanceEventRecord> filterEvents(ComponentMapHolder componentMapHolder, List<ProvenanceEventRecord> provenanceEvents) {
@ -234,7 +249,38 @@ public class ProvenanceEventConsumer {
List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
if (!eventTypesExclude.isEmpty() && eventTypesExclude.contains(provenanceEventRecord.getEventType())) {
continue;
}
if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
continue;
}
final String componentId = provenanceEventRecord.getComponentId();
if (!componentIdsExclude.isEmpty()) {
if (componentIdsExclude.contains(componentId)) {
continue;
}
// If we aren't excluding it based on component ID, let's see if this component has a parent process group IDs
// that is being excluded
if (componentMapHolder == null) {
continue;
}
final String processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType());
if (!StringUtils.isEmpty(processGroupId)) {
// Check if the process group or any parent process group is specified as a target component ID.
if (componentIdsExclude.contains(processGroupId)) {
continue;
}
ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
while (parentProcessGroup != null && !componentIdsExclude.contains(parentProcessGroup.getId())) {
parentProcessGroup = parentProcessGroup.getParent();
}
if (parentProcessGroup != null) {
continue;
}
}
}
if (!componentIds.isEmpty() && !componentIds.contains(componentId)) {
// If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
// that is being filtered on
@ -245,7 +291,6 @@ public class ProvenanceEventConsumer {
if (StringUtils.isEmpty(processGroupId)) {
continue;
}
// Check if the process group or any parent process group is specified as a target component ID.
if (!componentIds.contains(processGroupId)) {
ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) {
@ -256,7 +301,8 @@ public class ProvenanceEventConsumer {
}
}
}
if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
if (componentTypeRegexExclude != null && componentTypeRegexExclude.matcher(provenanceEventRecord.getComponentType()).matches()) {
continue;
}
if (componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {

View File

@ -87,7 +87,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-event-filter")
.displayName("Event Type")
.displayName("Event Type to Include")
.description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+ "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If "
+ "multiple filters are set, the filters are cumulative.")
@ -95,24 +95,55 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-event-filter-exclude")
.displayName("Event Type to Exclude")
.description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. "
+ "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If "
+ "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the "
+ "exclusion takes precedence and the event will not be sent.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-type-filter")
.displayName("Component Type")
.displayName("Component Type to Include")
.description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+ "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-type-filter-exclude")
.displayName("Component Type to Exclude")
.description("Regular expression to exclude the provenance events based on the component type. The events matching the regular "
+ "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+ "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
.name("s2s-prov-task-id-filter")
.displayName("Component ID")
.displayName("Component ID to Include")
.description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
+ "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-id-filter-exclude")
.displayName("Component ID to Exclude")
.description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no "
+ "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in "
+ "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
.name("start-position")
.displayName("Start Position")
@ -133,6 +164,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
// initialize component type filtering
consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).getValue());
final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ','));
if(targetEventTypes != null) {
@ -145,12 +177,28 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
}
}
final String[] targetEventTypesExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE).getValue(), ','));
if(targetEventTypesExclude != null) {
for(String type : targetEventTypesExclude) {
try {
consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
} catch (Exception e) {
getLogger().warn(type + " is not a correct event type, removed from the exclude filtering.");
}
}
}
// initialize component ID filtering
final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ','));
if(targetComponentIds != null) {
consumer.addTargetComponentId(targetComponentIds);
}
final String[] targetComponentIdsExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE).getValue(), ','));
if(targetComponentIdsExclude != null) {
consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
}
consumer.setScheduled(true);
}
@ -166,8 +214,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PLATFORM);
properties.add(FILTER_EVENT_TYPE);
properties.add(FILTER_EVENT_TYPE_EXCLUDE);
properties.add(FILTER_COMPONENT_TYPE);
properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
properties.add(FILTER_COMPONENT_ID);
properties.add(FILTER_COMPONENT_ID_EXCLUDE);
properties.add(START_POSITION);
return properties;
}

View File

@ -272,6 +272,25 @@ public class TestSiteToSiteProvenanceReportingTask {
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterComponentTypeExcludeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterComponentTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
@ -291,6 +310,25 @@ public class TestSiteToSiteProvenanceReportingTask {
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterComponentTypeNoResultExcluded() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "proc.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterEventTypeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
@ -310,6 +348,25 @@ public class TestSiteToSiteProvenanceReportingTask {
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterEventTypeExcludeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE_EXCLUDE, "RECEIVE, notExistingType, DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterEventTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
@ -371,6 +428,26 @@ public class TestSiteToSiteProvenanceReportingTask {
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterMultiFilterExcludeTakesPrecedence() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterProcessGroupId() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();