mirror of https://github.com/apache/nifi.git
NIFI-4547: Add ProvenanceEventConsumer utility class
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2236.
This commit is contained in:
parent
fb94e983b6
commit
d914ad2924
|
@ -0,0 +1,45 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-extension-utils</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>nifi-reporting-utils</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,244 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.util.provenance;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.state.Scope;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.reporting.EventAccess;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class ProvenanceEventConsumer {
|
||||
|
||||
public static final String LAST_EVENT_ID_KEY = "last_event_id";
|
||||
|
||||
public static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream",
|
||||
"Start reading provenance Events from the beginning of the stream (the oldest event first)");
|
||||
public static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
|
||||
"Start reading provenance Events from the end of the stream, ignoring old events");
|
||||
public static final PropertyDescriptor PROVENANCE_START_POSITION = new PropertyDescriptor.Builder()
|
||||
.name("provenance-start-position")
|
||||
.displayName("Provenance Record Start Position")
|
||||
.description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start")
|
||||
.allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
|
||||
.defaultValue(BEGINNING_OF_STREAM.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor PROVENANCE_BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("provenance-batch-size")
|
||||
.displayName("Provenance Record Batch Size")
|
||||
.description("Specifies how many records to send in a single batch, at most.")
|
||||
.required(true)
|
||||
.defaultValue("1000")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
|
||||
private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue();
|
||||
private Pattern componentTypeRegex;
|
||||
private List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
|
||||
private List<String> componentIds = new ArrayList<String>();
|
||||
private int batchSize = Integer.parseInt(PROVENANCE_BATCH_SIZE.getDefaultValue());
|
||||
|
||||
private volatile long firstEventId = -1L;
|
||||
private volatile boolean scheduled = false;
|
||||
|
||||
private ComponentLog logger;
|
||||
|
||||
public void setStartPositionValue(String startPositionValue) {
|
||||
this.startPositionValue = startPositionValue;
|
||||
}
|
||||
|
||||
public void setBatchSize(int batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public void setComponentTypeRegex(final String componentTypeRegex) {
|
||||
if (!StringUtils.isBlank(componentTypeRegex)) {
|
||||
this.componentTypeRegex = Pattern.compile(componentTypeRegex);
|
||||
}
|
||||
}
|
||||
|
||||
public void addTargetEventType(final ProvenanceEventType ... types) {
|
||||
for (ProvenanceEventType type : types) {
|
||||
eventTypes.add(type);
|
||||
}
|
||||
}
|
||||
|
||||
public void addTargetComponentId(final String ... ids) {
|
||||
for (String id : ids) {
|
||||
componentIds.add(id);
|
||||
}
|
||||
}
|
||||
|
||||
public void setScheduled(boolean scheduled) {
|
||||
this.scheduled = scheduled;
|
||||
}
|
||||
|
||||
public boolean isScheduled() {
|
||||
return scheduled;
|
||||
}
|
||||
|
||||
public void setLogger(ComponentLog logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public void consumeEvents(final EventAccess eventAccess, final StateManager stateManager,
|
||||
final Consumer<List<ProvenanceEventRecord>> consumer) throws ProcessException {
|
||||
|
||||
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
|
||||
|
||||
if(currMaxId == null) {
|
||||
logger.debug("No events to send because no events have been created yet.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (firstEventId < 0) {
|
||||
Map<String, String> state;
|
||||
try {
|
||||
state = stateManager.getState(Scope.LOCAL).toMap();
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to get state at start up due to:" + e.getMessage(), e);
|
||||
return;
|
||||
}
|
||||
|
||||
if (state.containsKey(LAST_EVENT_ID_KEY)) {
|
||||
firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
|
||||
} else {
|
||||
if (END_OF_STREAM.getValue().equals(startPositionValue)) {
|
||||
firstEventId = currMaxId;
|
||||
}
|
||||
}
|
||||
|
||||
if (currMaxId < (firstEventId - 1)) {
|
||||
if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) {
|
||||
logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
|
||||
"ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
|
||||
firstEventId = -1;
|
||||
} else {
|
||||
logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
|
||||
"ids. Restarting querying from the latest event in the Provenance Repository.", new Object[] {currMaxId, firstEventId});
|
||||
firstEventId = currMaxId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (currMaxId == (firstEventId - 1)) {
|
||||
logger.debug("No events to send due to the current max id being equal to the last id that was queried.");
|
||||
return;
|
||||
}
|
||||
|
||||
List<ProvenanceEventRecord> rawEvents;
|
||||
List<ProvenanceEventRecord> filteredEvents;
|
||||
try {
|
||||
rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
|
||||
filteredEvents = filterEvents(rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
return;
|
||||
}
|
||||
|
||||
if (rawEvents == null || rawEvents.isEmpty()) {
|
||||
logger.debug("No events to send due to 'events' being null or empty.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Consume while there are more events and not stopped.
|
||||
while (rawEvents != null && !rawEvents.isEmpty() && isScheduled()) {
|
||||
|
||||
if (!filteredEvents.isEmpty()) {
|
||||
// Executes callback.
|
||||
consumer.accept(filteredEvents);
|
||||
}
|
||||
|
||||
firstEventId = updateLastEventId(rawEvents, stateManager);
|
||||
|
||||
// Retrieve the next batch
|
||||
try {
|
||||
rawEvents = eventAccess.getProvenanceEvents(firstEventId, batchSize);
|
||||
filteredEvents = filterEvents(rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private long updateLastEventId(final List<ProvenanceEventRecord> events, final StateManager stateManager) {
|
||||
if (events == null || events.isEmpty()) {
|
||||
return firstEventId;
|
||||
}
|
||||
|
||||
// Store the id of the last event so we know where we left off
|
||||
final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
|
||||
final String lastEventId = String.valueOf(lastEvent.getEventId());
|
||||
try {
|
||||
Map<String, String> newMapOfState = new HashMap<>();
|
||||
newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
|
||||
stateManager.setState(newMapOfState, Scope.LOCAL);
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}",
|
||||
new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
|
||||
}
|
||||
|
||||
return lastEvent.getEventId() + 1;
|
||||
}
|
||||
|
||||
|
||||
private boolean isFilteringEnabled() {
|
||||
return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
|
||||
}
|
||||
|
||||
private List<ProvenanceEventRecord> filterEvents(List<ProvenanceEventRecord> provenanceEvents) {
|
||||
if(isFilteringEnabled()) {
|
||||
List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
|
||||
|
||||
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
|
||||
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
|
||||
continue;
|
||||
}
|
||||
if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
|
||||
continue;
|
||||
}
|
||||
if(componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {
|
||||
continue;
|
||||
}
|
||||
filteredEvents.add(provenanceEventRecord);
|
||||
}
|
||||
|
||||
return filteredEvents;
|
||||
} else {
|
||||
return provenanceEvents;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -30,6 +30,7 @@
|
|||
<module>nifi-record-utils</module>
|
||||
<module>nifi-hadoop-utils</module>
|
||||
<module>nifi-processor-utils</module>
|
||||
<module>nifi-reporting-utils</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -34,6 +34,10 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-reporting-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
|||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.state.Scope;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.status.PortStatus;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
|
@ -39,6 +38,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
|
|||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
|
||||
|
||||
import javax.json.Json;
|
||||
import javax.json.JsonArray;
|
||||
|
@ -62,7 +62,6 @@ import java.util.Map;
|
|||
import java.util.TimeZone;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@Tags({"provenance", "lineage", "tracking", "site", "site to site", "restricted"})
|
||||
@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
|
||||
|
@ -125,52 +124,43 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
|||
.required(true)
|
||||
.build();
|
||||
|
||||
private volatile long firstEventId = -1L;
|
||||
private volatile boolean isFilteringEnabled = false;
|
||||
private volatile Pattern componentTypeRegex;
|
||||
private volatile List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
|
||||
private volatile List<String> componentIds = new ArrayList<String>();
|
||||
private volatile boolean scheduled = false;
|
||||
private volatile ProvenanceEventConsumer consumer;
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ConfigurationContext context) throws IOException {
|
||||
// initialize component type filtering
|
||||
componentTypeRegex = StringUtils.isBlank(context.getProperty(FILTER_COMPONENT_TYPE).getValue()) ? null : Pattern.compile(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
|
||||
consumer = new ProvenanceEventConsumer();
|
||||
consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
|
||||
consumer.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
|
||||
consumer.setLogger(getLogger());
|
||||
|
||||
final String[] eventList = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ','));
|
||||
if(eventList != null) {
|
||||
for(String type : eventList) {
|
||||
// initialize component type filtering
|
||||
consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
|
||||
|
||||
final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ','));
|
||||
if(targetEventTypes != null) {
|
||||
for(String type : targetEventTypes) {
|
||||
try {
|
||||
eventTypes.add(ProvenanceEventType.valueOf(type));
|
||||
consumer.addTargetEventType(ProvenanceEventType.valueOf(type));
|
||||
} catch (Exception e) {
|
||||
getLogger().warn(type + " is not a correct event type, removed from the filtering.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
eventTypes.clear();
|
||||
}
|
||||
|
||||
// initialize component ID filtering
|
||||
final String[] componentIdList = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ','));
|
||||
if(componentIdList != null) {
|
||||
componentIds.addAll(Arrays.asList(componentIdList));
|
||||
} else {
|
||||
componentIds.clear();
|
||||
final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ','));
|
||||
if(targetComponentIds != null) {
|
||||
consumer.addTargetComponentId(targetComponentIds);
|
||||
}
|
||||
|
||||
// set a boolean whether filtering will be applied or not
|
||||
isFilteringEnabled = componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
|
||||
|
||||
scheduled = true;
|
||||
consumer.setScheduled(true);
|
||||
}
|
||||
|
||||
@OnUnscheduled
|
||||
public void onUnscheduled() {
|
||||
scheduled = false;
|
||||
}
|
||||
|
||||
public boolean isScheduled() {
|
||||
return scheduled;
|
||||
if (consumer != null) {
|
||||
consumer.setScheduled(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -228,65 +218,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
|||
final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
|
||||
final Map<String,String> componentMap = createComponentMap(procGroupStatus);
|
||||
|
||||
Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId();
|
||||
|
||||
if(currMaxId == null) {
|
||||
getLogger().debug("No events to send because no events have been created yet.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (firstEventId < 0) {
|
||||
Map<String, String> state;
|
||||
try {
|
||||
state = context.getStateManager().getState(Scope.LOCAL).toMap();
|
||||
} catch (IOException e) {
|
||||
getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
|
||||
return;
|
||||
}
|
||||
|
||||
final String startPositionValue = context.getProperty(START_POSITION).getValue();
|
||||
|
||||
if (state.containsKey(LAST_EVENT_ID_KEY)) {
|
||||
firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
|
||||
} else {
|
||||
if (END_OF_STREAM.getValue().equals(startPositionValue)) {
|
||||
firstEventId = currMaxId;
|
||||
}
|
||||
}
|
||||
|
||||
if (currMaxId < (firstEventId - 1)) {
|
||||
if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) {
|
||||
getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
|
||||
"ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
|
||||
firstEventId = -1;
|
||||
} else {
|
||||
getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
|
||||
"ids. Restarting querying from the latest event in the Provenance Repository.", new Object[] {currMaxId, firstEventId});
|
||||
firstEventId = currMaxId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (currMaxId == (firstEventId - 1)) {
|
||||
getLogger().debug("No events to send due to the current max id being equal to the last id that was queried.");
|
||||
return;
|
||||
}
|
||||
|
||||
List<ProvenanceEventRecord> rawEvents;
|
||||
List<ProvenanceEventRecord> filteredEvents;
|
||||
try {
|
||||
rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
|
||||
filteredEvents = filterEvents(rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
return;
|
||||
}
|
||||
|
||||
if (rawEvents == null || rawEvents.isEmpty()) {
|
||||
getLogger().debug("No events to send due to 'events' being null or empty.");
|
||||
return;
|
||||
}
|
||||
|
||||
final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
|
||||
URL url;
|
||||
try {
|
||||
|
@ -306,103 +237,44 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
|
|||
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
|
||||
df.setTimeZone(TimeZone.getTimeZone("Z"));
|
||||
|
||||
while (rawEvents != null && !rawEvents.isEmpty() && isScheduled()) {
|
||||
consumer.consumeEvents(context.getEventAccess(), context.getStateManager(), events -> {
|
||||
final long start = System.nanoTime();
|
||||
|
||||
if (!filteredEvents.isEmpty()) {
|
||||
// Create a JSON array of all the events in the current batch
|
||||
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
|
||||
for (final ProvenanceEventRecord event : filteredEvents) {
|
||||
final String componentName = componentMap.get(event.getComponentId());
|
||||
arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId));
|
||||
}
|
||||
final JsonArray jsonArray = arrayBuilder.build();
|
||||
|
||||
// Send the JSON document for the current batch
|
||||
try {
|
||||
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
|
||||
if (transaction == null) {
|
||||
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
final String transactionId = UUID.randomUUID().toString();
|
||||
attributes.put("reporting.task.transaction.id", transactionId);
|
||||
attributes.put("mime.type", "application/json");
|
||||
|
||||
final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
|
||||
transaction.send(data, attributes);
|
||||
transaction.confirm();
|
||||
transaction.complete();
|
||||
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
|
||||
new Object[] {filteredEvents.size(), transferMillis, transactionId, rawEvents.get(0).getEventId()});
|
||||
} catch (final IOException e) {
|
||||
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
|
||||
}
|
||||
// Create a JSON array of all the events in the current batch
|
||||
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
|
||||
for (final ProvenanceEventRecord event : events) {
|
||||
final String componentName = componentMap.get(event.getComponentId());
|
||||
arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId));
|
||||
}
|
||||
final JsonArray jsonArray = arrayBuilder.build();
|
||||
|
||||
firstEventId = updateLastEventId(rawEvents, context.getStateManager());
|
||||
|
||||
// Retrieve the next batch
|
||||
// Send the JSON document for the current batch
|
||||
try {
|
||||
rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
|
||||
filteredEvents = filterEvents(rawEvents);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe);
|
||||
return;
|
||||
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
|
||||
if (transaction == null) {
|
||||
getLogger().debug("All destination nodes are penalized; will attempt to send data later");
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
final String transactionId = UUID.randomUUID().toString();
|
||||
attributes.put("reporting.task.transaction.id", transactionId);
|
||||
attributes.put("mime.type", "application/json");
|
||||
|
||||
final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
|
||||
transaction.send(data, attributes);
|
||||
transaction.confirm();
|
||||
transaction.complete();
|
||||
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
|
||||
new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()});
|
||||
} catch (final IOException e) {
|
||||
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private long updateLastEventId(final List<ProvenanceEventRecord> events, final StateManager stateManager) {
|
||||
if (events == null || events.isEmpty()) {
|
||||
return firstEventId;
|
||||
}
|
||||
|
||||
// Store the id of the last event so we know where we left off
|
||||
final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
|
||||
final String lastEventId = String.valueOf(lastEvent.getEventId());
|
||||
try {
|
||||
Map<String, String> newMapOfState = new HashMap<>();
|
||||
newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
|
||||
stateManager.setState(newMapOfState, Scope.LOCAL);
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}",
|
||||
new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
|
||||
}
|
||||
|
||||
return lastEvent.getEventId() + 1;
|
||||
}
|
||||
|
||||
private List<ProvenanceEventRecord> filterEvents(final List<ProvenanceEventRecord> provenanceEvents) {
|
||||
if (provenanceEvents == null || provenanceEvents.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
if(isFilteringEnabled) {
|
||||
List<ProvenanceEventRecord> filteredEvents = new ArrayList<ProvenanceEventRecord>();
|
||||
|
||||
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
|
||||
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
|
||||
continue;
|
||||
}
|
||||
if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
|
||||
continue;
|
||||
}
|
||||
if(componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {
|
||||
continue;
|
||||
}
|
||||
filteredEvents.add(provenanceEventRecord);
|
||||
}
|
||||
|
||||
return filteredEvents;
|
||||
} else {
|
||||
return provenanceEvents;
|
||||
}
|
||||
}
|
||||
|
||||
static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
|
||||
final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform, final String nodeIdentifier) {
|
||||
|
|
|
@ -336,6 +336,7 @@ public class TestSiteToSiteProvenanceReportingTask {
|
|||
task.initialize(initContext);
|
||||
|
||||
// execute the reporting task and should not produce any data b/c max id same as previous id
|
||||
task.onScheduled(confContext);
|
||||
task.onTrigger(context);
|
||||
assertEquals(0, task.dataSent.size());
|
||||
}
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -1484,6 +1484,11 @@
|
|||
<artifactId>nifi-processor-utils</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-reporting-utils</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-schema-utils</artifactId>
|
||||
|
|
Loading…
Reference in New Issue