NIFI-10: Added FETCH and DOWNLOAD Provenance Events; updated FlowController to use DOWNLOAD event instead of SEND whenever a user downloads/views content via Provenance Event

This commit is contained in:
Mark Payne 2015-10-25 11:53:46 -04:00
parent 51f564024a
commit fc2aa2764c
2 changed files with 69 additions and 66 deletions

View File

@ -46,6 +46,11 @@ public enum ProvenanceEventType {
*/
SEND,
/**
* Indicates that the contents of a FlowFile were downloaded by a user or external entity.
*/
DOWNLOAD,
/**
* Indicates a provenance event for the conclusion of an object's life for
* some reason other than object expiration

View File

@ -216,7 +216,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds";
public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
public static final String ROOT_GROUP_ID_ALIAS = "root";
public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
@ -245,7 +245,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final UserService userService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
private final ComponentStatusRepository componentStatusRepository;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
// The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
@ -336,38 +336,38 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private FlowFileSwapManager flowFileSwapManager; // guarded by read/write lock
private FlowFileSwapManager flowFileSwapManager; // guarded by read/write lock
private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
public static FlowController createStandaloneInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor) {
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor) {
return new FlowController(
flowFileEventRepo,
properties,
userService,
encryptor,
/* configuredForClustering */ false,
/* NodeProtocolSender */ null);
flowFileEventRepo,
properties,
userService,
encryptor,
/* configuredForClustering */ false,
/* NodeProtocolSender */ null);
}
public static FlowController createClusteredInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor,
final NodeProtocolSender protocolSender) {
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor,
final NodeProtocolSender protocolSender) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
properties,
userService,
encryptor,
/* configuredForClustering */ true,
/* NodeProtocolSender */ protocolSender);
flowFileEventRepo,
properties,
userService,
encryptor,
/* configuredForClustering */ true,
/* NodeProtocolSender */ protocolSender);
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
@ -375,12 +375,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
private FlowController(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor,
final boolean configuredForClustering,
final NodeProtocolSender protocolSender) {
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final UserService userService,
final StringEncryptor encryptor,
final boolean configuredForClustering,
final NodeProtocolSender protocolSender) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(5);
@ -416,7 +416,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
@ -468,7 +468,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
externalSiteListener = null;
} else if (isSiteToSiteSecure && sslContext == null) {
LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore "
+ "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
+ "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
externalSiteListener = null;
} else {
// Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
@ -501,7 +501,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+ NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
}
try {
@ -612,7 +612,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
startConnectable(connectable);
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
LOG.error("Unable to start {} due to {}", new Object[] {connectable, t.toString()});
if (LOG.isDebugEnabled()) {
LOG.error("", t);
}
@ -627,7 +627,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
startedTransmitting++;
} catch (final Throwable t) {
LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
LOG.error("Unable to start transmitting with {} due to {}", new Object[] {remoteGroupPort, t});
}
}
@ -642,7 +642,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
startConnectable(connectable);
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
LOG.error("Unable to start {} due to {}", new Object[] {connectable, t});
}
}
@ -658,7 +658,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+ NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
}
try {
@ -676,7 +676,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+ NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
}
try {
@ -690,7 +690,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+ NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
}
try {
@ -910,7 +910,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
}
/**
@ -927,7 +927,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
}
/**
@ -1083,14 +1083,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
try {
flowFileRepository.close();
} catch (final Throwable t) {
LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t});
LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[] {t});
}
if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
LOG.info("Controller has been terminated successfully.");
} else {
LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that "
+ "will take an indeterminate amount of time to stop. Might need to kill the program manually.");
+ "will take an indeterminate amount of time to stop. Might need to kill the program manually.");
}
if (externalSiteListener != null) {
@ -1153,7 +1153,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used
*/
public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
writeLock.lock();
try {
LOG.debug("Synchronizing controller with proposed flow");
@ -1199,7 +1199,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*
* @param maxThreadCount
*
* This method must be called while holding the write lock!
* This method must be called while holding the write lock!
*/
private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
if (maxThreadCount < 1) {
@ -1267,7 +1267,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws ProcessorInstantiationException
*
* @throws IllegalStateException if no process group can be found with the ID of DTO or with the ID of the DTO's parentGroupId, if the template ID specified is invalid, or if the DTO's Parent
* Group ID changes but the parent group has incoming or outgoing connections
* Group ID changes but the parent group has incoming or outgoing connections
*
* @throws NullPointerException if the DTO or its ID is null
*/
@ -1371,7 +1371,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*
* @throws NullPointerException if either argument is null
* @throws IllegalStateException if the snippet is not valid because a component in the snippet has an ID that is not unique to this flow, or because it shares an Input Port or Output Port at the
* root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
* root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
* @throws ProcessorInstantiationException if unable to instantiate a processor
*/
public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
@ -2542,7 +2542,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (firstTimeAdded) {
final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
try {
task.initialize(config);
@ -2888,7 +2888,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
readLock.lock();
try {
return heartbeatGeneratorFuture != null && !heartbeatGeneratorFuture.isCancelled()
&& heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
&& heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
} finally {
readLock.unlock();
}
@ -2948,7 +2948,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN
* is not available - for instance, if cluster communications are not secure
* is not available - for instance, if cluster communications are not secure
*/
public String getClusterManagerDN() {
readLock.lock();
@ -3101,10 +3101,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public boolean isContentSame() {
return areEqual(event.getPreviousContentClaimContainer(), event.getContentClaimContainer())
&& areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
&& areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
&& areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
&& areEqual(event.getPreviousFileSize(), event.getFileSize());
&& areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
&& areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
&& areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
&& areEqual(event.getPreviousFileSize(), event.getFileSize());
}
@Override
@ -3180,7 +3180,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Register a Provenance Event to indicate that we replayed the data.
final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
.setEventType(ProvenanceEventType.SEND)
.setEventType(ProvenanceEventType.DOWNLOAD)
.setFlowFileUUID(provEvent.getFlowFileUuid())
.setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap())
.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
@ -3297,7 +3297,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Create the ContentClaim
final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
// Increment Claimant Count, since we will now be referencing the Content Claim
contentClaimManager.incrementClaimantCount(resourceClaim);
@ -3367,7 +3367,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord);
record.setDestination(queue);
flowFileRepository.updateRepository(Collections.<RepositoryRecord>singleton(record));
flowFileRepository.updateRepository(Collections.<RepositoryRecord> singleton(record));
// Enqueue the data
queue.put(flowFileRecord);
@ -3434,11 +3434,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
protocolSender.sendBulletins(message);
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
"Sending bulletins to cluster manager at %s",
dateFormatter.format(new Date())
)
);
String.format(
"Sending bulletins to cluster manager at %s",
dateFormatter.format(new Date())));
}
} catch (final UnknownServiceAddressException usae) {
@ -3496,7 +3494,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
} else {
escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
}
} else {
escapedBulletin = bulletin;
@ -3554,9 +3552,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
heartbeatLogger.info("Heartbeat created at {} and sent at {}; send took {} millis",
dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
dateFormatter.format(new Date()),
sendMillis);
dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
dateFormatter.format(new Date()),
sendMillis);
} catch (final UnknownServiceAddressException usae) {
if (heartbeatLogger.isDebugEnabled()) {
heartbeatLogger.debug(usae.getMessage());