From 2b435cdfc6fd0824d9eb5f2cf140a330c9f258ed Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 26 May 2017 10:59:47 -0400 Subject: [PATCH] NIFI-3985: This closes #1864. Added 'Starting Position' property to SiteToSiteReportingTask; also added additionalDetails.html that explains the schema and updated the reporting task to stop publishing when the user clicks 'stops' instead of running indefinitely until the reporting task has caught up Signed-off-by: joewitt --- .../SiteToSiteProvenanceReportingTask.java | 54 +++++++++++-- .../additionalDetails.html | 77 +++++++++++++++++++ ...TestSiteToSiteProvenanceReportingTask.java | 23 +++--- 3 files changed, 135 insertions(+), 19 deletions(-) create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index cae4d17b98..37b5070b77 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -23,6 +23,8 @@ import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; @@ -71,6 +73,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; static final String LAST_EVENT_ID_KEY = "last_event_id"; + static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream", + "Start reading provenance Events from the beginning of the stream (the oldest event first)"); + static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream", + "Start reading provenance Events from the end of the stream, ignoring old events"); + static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") .displayName("Platform") @@ -83,7 +90,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder() .name("s2s-prov-task-event-filter") - .displayName("Event type") + .displayName("Event Type") .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. " + "Available event types are " + ProvenanceEventType.values() + ". If no filter is set, all the events are sent. If " + "multiple filters are set, the filters are cumulative.") @@ -93,7 +100,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder() .name("s2s-prov-task-type-filter") - .displayName("Component type") + .displayName("Component Type") .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular " + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") .required(false) @@ -109,11 +116,21 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder() + .name("start-position") + .displayName("Start Position") + .description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start") + .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM) + .defaultValue(BEGINNING_OF_STREAM.getValue()) + .required(true) + .build(); + private volatile long firstEventId = -1L; private volatile boolean isFilteringEnabled = false; private volatile Pattern componentTypeRegex; private volatile List eventTypes = new ArrayList(); private volatile List componentIds = new ArrayList(); + private volatile boolean scheduled = false; @OnScheduled public void onScheduled(final ConfigurationContext context) throws IOException { @@ -139,6 +156,17 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti // set a boolean whether filtering will be applied or not isFilteringEnabled = componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty(); + + scheduled = true; + } + + @OnUnscheduled + public void onUnscheduled() { + scheduled = false; + } + + public boolean isScheduled() { + return scheduled; } @Override @@ -148,6 +176,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti properties.add(FILTER_EVENT_TYPE); properties.add(FILTER_COMPONENT_TYPE); properties.add(FILTER_COMPONENT_ID); + properties.add(START_POSITION); return properties; } @@ -210,14 +239,27 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e); return; } + + final String startPositionValue = context.getProperty(START_POSITION).getValue(); + if (state.containsKey(LAST_EVENT_ID_KEY)) { firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1; + } else { + if (END_OF_STREAM.getValue().equals(startPositionValue)) { + firstEventId = currMaxId; + } } - if(currMaxId < (firstEventId - 1)){ - getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + + if (currMaxId < (firstEventId - 1)) { + if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) { + getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId}); - firstEventId = -1; + firstEventId = -1; + } else { + getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + + "ids. Restarting querying from the latest event in the Provenance Repository.", new Object[] {currMaxId, firstEventId}); + firstEventId = currMaxId; + } } } @@ -258,7 +300,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); df.setTimeZone(TimeZone.getTimeZone("Z")); - while (events != null && !events.isEmpty()) { + while (events != null && !events.isEmpty() && isScheduled()) { final long start = System.nanoTime(); // Create a JSON array of all the events in the current batch diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html new file mode 100644 index 0000000000..7e8204c73d --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html @@ -0,0 +1,77 @@ + + + + + + SiteToSiteProvenanceReportingTask + + + + + +

+ The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to + the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of + all of the different Processors that are available in NiFi in order to processor or distribute that data. When possible, it is + advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because + when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there + is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events + that are sent back to NiFi, the receiving NiFi will have to generate only a single event per component. +

+ +

+ When published to a NiFi instance, the Provenance data is sent as a JSON array. Quite often, it can be useful to work with this data using + a schema. As such, the schema for this Provenance data can be defined as follows: +

+ +
+
+{
+  "namespace": "nifi",
+  "name": "provenanceEvent",
+  "type": "record",
+  "fields": [
+    { "name": "eventId", "type": "string" },
+    { "name": "eventOrdinal", "type": "long" },
+    { "name": "eventType", "type": "string" },
+    { "name": "timestampMillis", "type": "long" },
+    { "name": "durationMillis", "type": "long" },
+    { "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } },
+    { "name": "details", "type": "string" },
+    { "name": "componentId", "type": "string" },
+    { "name": "componentType", "type": "string" },
+    { "name": "entityId", "type": "string" },
+    { "name": "entityType", "type": "string" },
+    { "name": "entitySize", "type": ["null", "long"] },
+    { "name": "previousEntitySize", "type": ["null", "long"] },
+    { "name": "updatedAttributes", "type": { "type": "map", "values": "string" } },
+    { "name": "previousAttributes", "type": { "type": "map", "values": "string" } },
+    { "name": "actorHostname", "type": "string" },
+    { "name": "contentURI", "type": "string" },
+    { "name": "previousContentURI", "type": "string" },
+    { "name": "parentIds", "type": { "type": "array", "items": "string" } },
+    { "name": "childIds", "type": { "type": "array", "items": "string" } },
+    { "name": "platform", "type": "string" },
+    { "name": "application", "type": "string" },
+    { "name": "transitUri", "type": ["null", "string"] }
+  ]
+}
+
+
+ + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index a396ac81da..86cbb74d50 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -95,7 +95,10 @@ public class TestSiteToSiteProvenanceReportingTask { final List eventsToReturn = new ArrayList<>(); for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && totalEvents.get() < maxEventId; i++) { - eventsToReturn.add(event); + if (event != null) { + eventsToReturn.add(event); + } + totalEvents.getAndIncrement(); } return eventsToReturn; @@ -304,7 +307,12 @@ public class TestSiteToSiteProvenanceReportingTask { final long maxEventId = 2500; // create the mock reporting task and mock state manager - final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask(); + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + + final MockSiteToSiteProvenanceReportingTask task = setup(null, properties); final MockStateManager stateManager = new MockStateManager(task); // create the state map and set the last id to the same value as maxEventId @@ -312,10 +320,6 @@ public class TestSiteToSiteProvenanceReportingTask { state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId)); stateManager.setState(state, Scope.LOCAL); - // setup the mock reporting context to return the mock state manager - final ReportingContext context = Mockito.mock(ReportingContext.class); - Mockito.when(context.getStateManager()).thenReturn(stateManager); - // setup the mock provenance repository to return maxEventId final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class); Mockito.doAnswer(new Answer() { @@ -327,15 +331,8 @@ public class TestSiteToSiteProvenanceReportingTask { // setup the mock EventAccess to return the mock provenance repository final EventAccess eventAccess = Mockito.mock(EventAccess.class); - Mockito.when(context.getEventAccess()).thenReturn(eventAccess); Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); - // setup the mock initialization context - final ComponentLog logger = Mockito.mock(ComponentLog.class); - final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); - Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); - Mockito.when(initContext.getLogger()).thenReturn(logger); - task.initialize(initContext); // execute the reporting task and should not produce any data b/c max id same as previous id