NIFI-3985: This closes #1864. Added 'Starting Position' property to SiteToSiteReportingTask; also added additionalDetails.html that explains the schema and updated the reporting task to stop publishing when the user clicks 'stops' instead of running indefinitely until the reporting task has caught up

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-05-26 10:59:47 -04:00 committed by joewitt
parent dd50745a9f
commit 2b435cdfc6
3 changed files with 135 additions and 19 deletions

View File

@ -23,6 +23,8 @@ import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
@ -71,6 +73,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final String LAST_EVENT_ID_KEY = "last_event_id";
static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream",
"Start reading provenance Events from the beginning of the stream (the oldest event first)");
static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
"Start reading provenance Events from the end of the stream, ignoring old events");
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.displayName("Platform")
@ -83,7 +90,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-event-filter")
.displayName("Event type")
.displayName("Event Type")
.description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+ "Available event types are " + ProvenanceEventType.values() + ". If no filter is set, all the events are sent. If "
+ "multiple filters are set, the filters are cumulative.")
@ -93,7 +100,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-type-filter")
.displayName("Component type")
.displayName("Component Type")
.description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+ "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false)
@ -109,11 +116,21 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
.name("start-position")
.displayName("Start Position")
.description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start")
.allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
.defaultValue(BEGINNING_OF_STREAM.getValue())
.required(true)
.build();
private volatile long firstEventId = -1L;
private volatile boolean isFilteringEnabled = false;
private volatile Pattern componentTypeRegex;
private volatile List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
private volatile List<String> componentIds = new ArrayList<String>();
private volatile boolean scheduled = false;
@OnScheduled
public void onScheduled(final ConfigurationContext context) throws IOException {
@ -139,6 +156,17 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
// set a boolean whether filtering will be applied or not
isFilteringEnabled = componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
scheduled = true;
}
@OnUnscheduled
public void onUnscheduled() {
scheduled = false;
}
public boolean isScheduled() {
return scheduled;
}
@Override
@ -148,6 +176,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
properties.add(FILTER_EVENT_TYPE);
properties.add(FILTER_COMPONENT_TYPE);
properties.add(FILTER_COMPONENT_ID);
properties.add(START_POSITION);
return properties;
}
@ -210,14 +239,27 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
return;
}
final String startPositionValue = context.getProperty(START_POSITION).getValue();
if (state.containsKey(LAST_EVENT_ID_KEY)) {
firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
} else {
if (END_OF_STREAM.getValue().equals(startPositionValue)) {
firstEventId = currMaxId;
}
}
if(currMaxId < (firstEventId - 1)){
getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
if (currMaxId < (firstEventId - 1)) {
if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) {
getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
"ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
firstEventId = -1;
firstEventId = -1;
} else {
getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
"ids. Restarting querying from the latest event in the Provenance Repository.", new Object[] {currMaxId, firstEventId});
firstEventId = currMaxId;
}
}
}
@ -258,7 +300,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
df.setTimeZone(TimeZone.getTimeZone("Z"));
while (events != null && !events.isEmpty()) {
while (events != null && !events.isEmpty() && isScheduled()) {
final long start = System.nanoTime();
// Create a JSON array of all the events in the current batch

View File

@ -0,0 +1,77 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>SiteToSiteProvenanceReportingTask</title>
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<p>
The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to
the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of
all of the different Processors that are available in NiFi in order to processor or distribute that data. When possible, it is
advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because
when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there
is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events
that are sent back to NiFi, the receiving NiFi will have to generate only a single event per component.
</p>
<p>
When published to a NiFi instance, the Provenance data is sent as a JSON array. Quite often, it can be useful to work with this data using
a schema. As such, the schema for this Provenance data can be defined as follows:
</p>
<pre>
<code>
{
"namespace": "nifi",
"name": "provenanceEvent",
"type": "record",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventOrdinal", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "timestampMillis", "type": "long" },
{ "name": "durationMillis", "type": "long" },
{ "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name": "details", "type": "string" },
{ "name": "componentId", "type": "string" },
{ "name": "componentType", "type": "string" },
{ "name": "entityId", "type": "string" },
{ "name": "entityType", "type": "string" },
{ "name": "entitySize", "type": ["null", "long"] },
{ "name": "previousEntitySize", "type": ["null", "long"] },
{ "name": "updatedAttributes", "type": { "type": "map", "values": "string" } },
{ "name": "previousAttributes", "type": { "type": "map", "values": "string" } },
{ "name": "actorHostname", "type": "string" },
{ "name": "contentURI", "type": "string" },
{ "name": "previousContentURI", "type": "string" },
{ "name": "parentIds", "type": { "type": "array", "items": "string" } },
{ "name": "childIds", "type": { "type": "array", "items": "string" } },
{ "name": "platform", "type": "string" },
{ "name": "application", "type": "string" },
{ "name": "transitUri", "type": ["null", "string"] }
]
}
</code>
</pre>
</body>
</html>

View File

@ -95,7 +95,10 @@ public class TestSiteToSiteProvenanceReportingTask {
final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>();
for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && totalEvents.get() < maxEventId; i++) {
eventsToReturn.add(event);
if (event != null) {
eventsToReturn.add(event);
}
totalEvents.getAndIncrement();
}
return eventsToReturn;
@ -304,7 +307,12 @@ public class TestSiteToSiteProvenanceReportingTask {
final long maxEventId = 2500;
// create the mock reporting task and mock state manager
final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
final MockSiteToSiteProvenanceReportingTask task = setup(null, properties);
final MockStateManager stateManager = new MockStateManager(task);
// create the state map and set the last id to the same value as maxEventId
@ -312,10 +320,6 @@ public class TestSiteToSiteProvenanceReportingTask {
state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId));
stateManager.setState(state, Scope.LOCAL);
// setup the mock reporting context to return the mock state manager
final ReportingContext context = Mockito.mock(ReportingContext.class);
Mockito.when(context.getStateManager()).thenReturn(stateManager);
// setup the mock provenance repository to return maxEventId
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
Mockito.doAnswer(new Answer<Long>() {
@ -327,15 +331,8 @@ public class TestSiteToSiteProvenanceReportingTask {
// setup the mock EventAccess to return the mock provenance repository
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
// setup the mock initialization context
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
// execute the reporting task and should not produce any data b/c max id same as previous id