mirror of https://github.com/apache/nifi.git
NIFI-4468: If an entire batch of Provenance Events are read by the Site-to-Site Provenance Reporting Task and none of them match the filters, then the reporting did not update its state, so it would be stuck in this cycle indefinitely. Made fix so that if any event is read from the provenance repository, regardless of whether or not it matches the filters, we update the state to keep track of what has been processed
This closes #2198. Signed-off-by: Joe Skora <jskora@apache.org>
This commit is contained in:
parent
6201c06c99
commit
acf05e0636
|
@ -272,15 +272,17 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
|||
return;
|
||||
}
|
||||
|
||||
List<ProvenanceEventRecord> events;
|
||||
List<ProvenanceEventRecord> rawEvents;
|
||||
List<ProvenanceEventRecord> filteredEvents;
|
||||
try {
|
||||
events = filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()));
|
||||
rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
|
||||
filteredEvents = filterEvents(rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
return;
|
||||
}
|
||||
|
||||
if (events == null || events.isEmpty()) {
|
||||
if (rawEvents == null || rawEvents.isEmpty()) {
|
||||
getLogger().debug("No events to send due to 'events' being null or empty.");
|
||||
return;
|
||||
}
|
||||
|
@ -304,12 +306,13 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
|||
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
|
||||
df.setTimeZone(TimeZone.getTimeZone("Z"));
|
||||
|
||||
while (events != null && !events.isEmpty() && isScheduled()) {
|
||||
while (rawEvents != null && !rawEvents.isEmpty() && isScheduled()) {
|
||||
final long start = System.nanoTime();
|
||||
|
||||
if (!filteredEvents.isEmpty()) {
|
||||
// Create a JSON array of all the events in the current batch
|
||||
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
|
||||
for (final ProvenanceEventRecord event : events) {
|
||||
for (final ProvenanceEventRecord event : filteredEvents) {
|
||||
final String componentName = componentMap.get(event.getComponentId());
|
||||
arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId));
|
||||
}
|
||||
|
@ -335,16 +338,34 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
|||
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
|
||||
new Object[]{events.size(), transferMillis, transactionId, events.get(0).getEventId()});
|
||||
new Object[] {filteredEvents.size(), transferMillis, transactionId, rawEvents.get(0).getEventId()});
|
||||
} catch (final IOException e) {
|
||||
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
firstEventId = updateLastEventId(rawEvents, context.getStateManager());
|
||||
|
||||
// Retrieve the next batch
|
||||
try {
|
||||
rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
|
||||
filteredEvents = filterEvents(rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long updateLastEventId(final List<ProvenanceEventRecord> events, final StateManager stateManager) {
|
||||
if (events == null || events.isEmpty()) {
|
||||
return firstEventId;
|
||||
}
|
||||
|
||||
// Store the id of the last event so we know where we left off
|
||||
final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
|
||||
final String lastEventId = String.valueOf(lastEvent.getEventId());
|
||||
try {
|
||||
StateManager stateManager = context.getStateManager();
|
||||
Map<String, String> newMapOfState = new HashMap<>();
|
||||
newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
|
||||
stateManager.setState(newMapOfState, Scope.LOCAL);
|
||||
|
@ -353,20 +374,14 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
|||
new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
|
||||
}
|
||||
|
||||
firstEventId = lastEvent.getEventId() + 1;
|
||||
|
||||
// Retrieve the next batch
|
||||
try {
|
||||
events = filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()));
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
return;
|
||||
}
|
||||
return lastEvent.getEventId() + 1;
|
||||
}
|
||||
|
||||
private List<ProvenanceEventRecord> filterEvents(final List<ProvenanceEventRecord> provenanceEvents) {
|
||||
if (provenanceEvents == null || provenanceEvents.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
private List<ProvenanceEventRecord> filterEvents(List<ProvenanceEventRecord> provenanceEvents) {
|
||||
if(isFilteringEnabled) {
|
||||
List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
|
||||
|
||||
|
|
Loading…
Reference in New Issue