NIFI-3859 - Provide filtering options in S2SProvenanceReportingTask

This closes #1777.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2017-05-09 20:44:43 +02:00 committed by Koji Kawamura
parent eaefec6d81
commit b6eb0ac0fb
2 changed files with 300 additions and 32 deletions

View File

@ -17,13 +17,16 @@
package org.apache.nifi.reporting;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
@ -31,6 +34,7 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@ -47,6 +51,7 @@ import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -55,6 +60,7 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@Tags({"provenance", "lineage", "tracking", "site", "site to site", "restricted"})
@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
@ -67,6 +73,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.displayName("Platform")
.description("The value to use for the platform field in each provenance event.")
.required(true)
.expressionLanguageSupported(true)
@ -74,12 +81,73 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-event-filter")
.displayName("Event type")
.description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+ "Available event types are " + ProvenanceEventType.values() + ". If no filter is set, all the events are sent. If "
+ "multiple filters are set, the filters are cumulative.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-type-filter")
.displayName("Component type")
.description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+ "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
.name("s2s-prov-task-id-filter")
.displayName("Component ID")
.description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
+ "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile long firstEventId = -1L;
private volatile boolean isFilteringEnabled = false;
private volatile Pattern componentTypeRegex;
private volatile List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
private volatile List<String> componentIds = new ArrayList<String>();
@OnScheduled
public void onScheduled(final ConfigurationContext context) throws IOException {
// initialize component type filtering
componentTypeRegex = StringUtils.isBlank(context.getProperty(FILTER_COMPONENT_TYPE).getValue()) ? null : Pattern.compile(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
final String[] eventList = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ','));
if(eventList != null) {
for(String type : eventList) {
try {
eventTypes.add(ProvenanceEventType.valueOf(type));
} catch (Exception e) {
getLogger().warn(type + " is not a correct event type, removed from the filtering.");
}
}
}
// initialize component ID filtering
final String[] componentIdList = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ','));
if(componentIdList != null) {
componentIds.addAll(Arrays.asList(componentIdList));
}
// set a boolean whether filtering will be applied or not
isFilteringEnabled = componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PLATFORM);
properties.add(FILTER_EVENT_TYPE);
properties.add(FILTER_COMPONENT_TYPE);
properties.add(FILTER_COMPONENT_ID);
return properties;
}
@ -160,7 +228,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
List<ProvenanceEventRecord> events;
try {
events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
events = filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()));
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
return;
@ -243,7 +311,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
// Retrieve the next batch
try {
events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
events = filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()));
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
return;
@ -252,6 +320,29 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
}
private List<ProvenanceEventRecord> filterEvents(List<ProvenanceEventRecord> provenanceEvents) {
if(isFilteringEnabled) {
List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
continue;
}
if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
continue;
}
if(componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {
continue;
}
filteredEvents.add(provenanceEventRecord);
}
return filteredEvents;
} else {
return provenanceEvents;
}
}
static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform, final String nodeIdentifier) {
addField(builder, "eventId", UUID.randomUUID().toString());

View File

@ -21,6 +21,7 @@ package org.apache.nifi.reporting;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
@ -32,7 +33,6 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
@ -44,6 +44,7 @@ import org.mockito.stubbing.Answer;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -57,37 +58,13 @@ import static org.junit.Assert.assertEquals;
public class TestSiteToSiteProvenanceReportingTask {
@Test
public void testSerializedForm() throws IOException, InitializationException {
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
final Map<String, String> prevAttrs = new HashMap<>();
attributes.put("filename", "1234.xyz");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, attributes));
builder.setAttributes(prevAttrs, attributes);
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
final ProvenanceEventRecord event = builder.build();
private final ReportingContext context = Mockito.mock(ReportingContext.class);
private final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
private final ConfigurationContext confContext = Mockito.mock(ConfigurationContext.class);
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
final ReportingContext context = Mockito.mock(ReportingContext.class);
Mockito.when(context.getStateManager())
.thenReturn(new MockStateManager(task));
Mockito.doAnswer(new Answer<PropertyValue>() {
@ -98,6 +75,14 @@ public class TestSiteToSiteProvenanceReportingTask {
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(confContext).getProperty(Mockito.any(PropertyDescriptor.class));
final long maxEventId = 2500;
final AtomicInteger totalEvents = new AtomicInteger(0);
@ -129,12 +114,25 @@ public class TestSiteToSiteProvenanceReportingTask {
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
return task;
}
@Test
public void testSerializedForm() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
@ -144,6 +142,163 @@ public class TestSiteToSiteProvenanceReportingTask {
assertEquals(msgArray.getString("abc"), event.getAttributes().get("abc"));
}
@Test
public void testFilterComponentIdSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "2345, 5678, 1234");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterComponentIdNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "9999");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterComponentTypeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "dummy.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterComponentTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "proc.*");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterEventTypeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE, notExistingType, DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testFilterEventTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterMultiFilterNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "2345, 5678, 1234");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "dummy.*");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "DROP");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(0, task.dataSent.size());
}
@Test
public void testFilterMultiFilterSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "2345, 5678, 1234");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, "dummy.*");
properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
}
@Test
public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
final long maxEventId = 2500;
@ -194,6 +349,28 @@ public class TestSiteToSiteProvenanceReportingTask {
return mockFlowFile;
}
private ProvenanceEventRecord createProvenanceEventRecord() {
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
final Map<String, String> prevAttrs = new HashMap<>();
attributes.put("filename", "1234.xyz");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, attributes));
builder.setAttributes(prevAttrs, attributes);
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
return builder.build();
}
private static final class MockSiteToSiteProvenanceReportingTask extends SiteToSiteProvenanceReportingTask {
final List<byte[]> dataSent = new ArrayList<>();