From 16348b071d361697f517397b2da2ea32c5599c11 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 2 Aug 2016 09:56:05 -0400 Subject: [PATCH] NIFI-2452: This closes #771. Ensure that we keep track of how many references we have to each lucene searcher and only close the underlying index reader if there are no references to the searcher. Also updated to prefer newer provenance events over older provenance events, and calculate FlowFile lineage based on an event id instead of a FlowFile UUID, as it's much more efficient --- .../nifi/provenance/ProvenanceRepository.java | 23 +++ .../provenance/MockProvenanceRepository.java | 5 + .../heartbeat/AbstractHeartbeatMonitor.java | 26 ++-- .../nifi/controller/FlowController.java | 139 +++++++----------- .../src/main/resources/conf/logback.xml | 1 + .../nifi/web/api/ProvenanceResource.java | 4 +- .../nifi/web/controller/ControllerFacade.java | 6 +- .../js/nf/provenance/nf-provenance-lineage.js | 3 +- .../PersistentProvenanceRepository.java | 23 +++ .../nifi/provenance/lucene/DocsReader.java | 7 +- .../nifi/provenance/lucene/IndexManager.java | 3 + .../TestPersistentProvenanceRepository.java | 3 +- .../VolatileProvenanceRepository.java | 16 ++ 13 files changed, 159 insertions(+), 100 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java index 7ac1a65e80..6a5954a16f 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java @@ -106,6 +106,29 @@ public interface ProvenanceRepository extends ProvenanceEventRepository { */ ComputeLineageSubmission submitLineageComputation(String flowFileUuid, NiFiUser user); + /** + * Submits a Lineage Computation to be completed and returns the + * AsynchronousLineageResult that indicates the status of the request and + * the results, if the computation is complete. If the given user does not + * have authorization to view one of the events in the lineage, a placeholder + * event will be used instead that provides none of the event details except + * for the identifier of the component that emitted the Provenance Event. It is + * necessary to include this node in the lineage view so that the lineage makes + * sense, rather than showing disconnected graphs when the user is not authorized + * for all components' provenance events. + * + * This method is preferred to {@link #submitLineageComputation(String, NiFiUser)} because + * it is much more efficient, but the former may be used if a particular Event ID is not + * available. + * + * @param eventId the numeric ID of the event that the lineage is for + * @param user the NiFi User to authorize events against + * + * @return a {@link ComputeLineageSubmission} object that can be used to + * check if the computing is complete and if so get the results + */ + ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user); + /** * @param lineageIdentifier identifier of lineage to compute * @param user the user who is retrieving the lineage submission diff --git a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java index a13a338dcc..9bc5f0ecd7 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java +++ b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java @@ -99,6 +99,11 @@ public class MockProvenanceRepository implements ProvenanceRepository { throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation"); } + @Override + public ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user) { + throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation"); + } + @Override public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, NiFiUser user) { throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 0bd84d6d42..22c9ab5042 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -203,16 +203,24 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode(); // Determine whether or not the node should be allowed to be in the cluster still, depending on its reason for disconnection. - if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT || disconnectionCode == DisconnectionCode.UNABLE_TO_COMMUNICATE) { - clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously " - + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); + switch (disconnectionCode) { + case LACK_OF_HEARTBEAT: + case UNABLE_TO_COMMUNICATE: + case NODE_SHUTDOWN: + case NOT_YET_CONNECTED: + case STARTUP_FAILURE: { + clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously " + + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); - clusterCoordinator.requestNodeConnect(nodeId, null); - } else { - // disconnected nodes should not heartbeat, so we need to issue a disconnection request. - logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request."); - clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString()); - removeHeartbeat(nodeId); + clusterCoordinator.requestNodeConnect(nodeId, null); + } + default: { + // disconnected nodes should not heartbeat, so we need to issue a disconnection request. + logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request."); + clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, + DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString()); + removeHeartbeat(nodeId); + } } return; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index f6973a5448..b7b32ad76e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,7 +16,38 @@ */ package org.apache.nifi.controller; -import com.sun.jersey.api.client.ClientHandlerException; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.SSLContext; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -183,10 +214,10 @@ import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; -import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -208,37 +239,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; -import static java.util.Objects.requireNonNull; +import com.sun.jersey.api.client.ClientHandlerException; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { @@ -303,7 +304,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final Integer remoteInputHttpPort; private final Boolean isSiteToSiteSecure; - private ProcessGroup rootGroup; + private final AtomicReference rootGroupRef = new AtomicReference<>(); private final List startConnectablesAfterInitialization; private final List startRemoteGroupPortsAfterInitialization; private final LeaderElectionManager leaderElectionManager; @@ -516,9 +517,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.snippetManager = new SnippetManager(); - rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, + + final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, properties, encryptor, this, this.variableRegistry); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); + rootGroupRef.set(rootGroup); instanceId = ComponentIdGenerator.generateId().toString(); controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry); @@ -1197,6 +1200,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext); } + public ProcessGroup getRootGroup() { + return rootGroupRef.get(); + } + /** * Verifies that no output port exists with the given id or name. If this * does not hold true, throws an IllegalStateException @@ -1205,6 +1212,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws IllegalStateException port already exists */ private void verifyPortIdDoesNotExist(final String id) { + final ProcessGroup rootGroup = getRootGroup(); Port port = rootGroup.findOutputPort(id); if (port != null) { throw new IllegalStateException("An Input Port already exists with ID " + id); @@ -1220,12 +1228,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Group. */ public String getName() { - readLock.lock(); - try { - return rootGroup.getName(); - } finally { - readLock.unlock(); - } + return getRootGroup().getName(); } /** @@ -1235,12 +1238,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @param name of root group */ public void setName(final String name) { - readLock.lock(); - try { - rootGroup.setName(name); - } finally { - readLock.unlock(); - } + getRootGroup().setName(name); } /** @@ -1248,12 +1246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Root Group */ public String getComments() { - readLock.lock(); - try { - return rootGroup.getComments(); - } finally { - readLock.unlock(); - } + return getRootGroup().getComments(); } /** @@ -1263,12 +1256,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * the controller */ public void setComments(final String comments) { - readLock.lock(); - try { - rootGroup.setComments(comments); - } finally { - readLock.unlock(); - } + getRootGroup().setComments(comments); } /** @@ -1327,7 +1315,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Trigger any processors' methods marked with @OnShutdown to be called - rootGroup.shutdown(); + getRootGroup().shutdown(); stateManagerProvider.shutdown(); @@ -1495,12 +1483,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @return the ID of the root group */ public String getRootGroupId() { - readLock.lock(); - try { - return rootGroup.getIdentifier(); - } finally { - readLock.unlock(); - } + return getRootGroup().getIdentifier(); } /** @@ -1519,14 +1502,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R writeLock.lock(); try { - rootGroup = group; - + rootGroupRef.set(group); for (final RemoteSiteListener listener : externalSiteListeners) { - listener.setRootGroup(rootGroup); + listener.setRootGroup(group); } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); + this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary())); } finally { writeLock.unlock(); } @@ -2198,14 +2180,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ public ProcessGroup getGroup(final String id) { requireNonNull(id); - final ProcessGroup root; - readLock.lock(); - try { - root = rootGroup; - } finally { - readLock.unlock(); - } - + final ProcessGroup root = getRootGroup(); final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; return root == null ? null : root.findProcessGroup(searchId); } @@ -3458,7 +3433,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); + this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary())); } finally { writeLock.unlock(); } @@ -3876,7 +3851,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.connectionStatus = connectionStatus; // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); + this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary())); } finally { rwLock.writeLock().unlock(); } @@ -4018,9 +3993,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // is set to the root group and otherwise assume that the ID is that of a component. final DataAuthorizable authorizable; if (rootGroupId.equals(componentId)) { - authorizable = new DataAuthorizable(rootGroup); + authorizable = new DataAuthorizable(getRootGroup()); } else { - final Connectable connectable = rootGroup.findConnectable(componentId); + final Connectable connectable = getRootGroup().findConnectable(componentId); if (connectable == null) { throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow."); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml index f67a6cd567..871265e699 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml @@ -98,6 +98,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java index 24e303de3b..e618e8ad54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java @@ -464,8 +464,8 @@ public class ProvenanceResource extends ApplicationResource { break; case FLOWFILE: // ensure the uuid has been specified - if (requestDto.getUuid() == null) { - throw new IllegalArgumentException("The flowfile uuid must be specified when the event type is FLOWFILE."); + if (requestDto.getUuid() == null && requestDto.getEventId() == null) { + throw new IllegalArgumentException("The flowfile uuid or event id must be specified when the event type is FLOWFILE."); } break; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 2152c76969..bed66abeb3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -1013,7 +1013,11 @@ public class ControllerFacade implements Authorizable { // submit the event if (LineageRequestType.FLOWFILE.equals(requestDto.getLineageRequestType())) { // submit uuid - result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser()); + if (requestDto.getEventId() == null) { + result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser()); + } else { + result = provenanceRepository.submitLineageComputation(requestDto.getEventId(), NiFiUserUtils.getNiFiUser()); + } } else { // submit event... (parents or children) if (LineageRequestType.PARENTS.equals(requestDto.getLineageRequestType())) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js index 8287bff0ba..6b038c7342 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js @@ -1293,7 +1293,8 @@ nf.ng.ProvenanceLineage = function () { var lineageRequest = { lineageRequestType: 'FLOWFILE', uuid: flowFileUuid, - clusterNodeId: clusterNodeId + clusterNodeId: clusterNodeId, + eventId: eventId }; // update the progress bar value diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 87b617f53c..f8bb667c00 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -35,6 +35,7 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.expiration.ExpirationAction; import org.apache.nifi.provenance.expiration.FileRemovalAction; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; import org.apache.nifi.provenance.lineage.FlowFileLineage; import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageComputationType; @@ -2169,6 +2170,28 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return new FlowFileLineage(result.getNodes(), result.getEdges()); } + @Override + public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) { + final ProvenanceEventRecord event; + try { + event = getEvent(eventId); + } catch (final Exception e) { + logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections. emptySet(), 1, user.getIdentity()); + result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information."); + return result; + } + + if (event == null) { + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections. emptySet(), 1, user.getIdentity()); + result.getResult().setError("Could not find Provenance Event with ID " + eventId); + lineageSubmissionMap.put(result.getLineageIdentifier(), result); + return result; + } + + return submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user, LineageComputationType.FLOWFILE_LINEAGE, eventId, event.getLineageStartDate(), Long.MAX_VALUE); + } + @Override public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) { return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index e448f277f4..ce62152506 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -55,11 +55,12 @@ class DocsReader { } final long start = System.nanoTime(); - final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); + final ScoreDoc[] scoreDocs = topDocs.scoreDocs; + final int numDocs = Math.min(scoreDocs.length, maxResults); final List docs = new ArrayList<>(numDocs); - for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { - final int docId = scoreDoc.doc; + for (int i = numDocs - 1; i >= 0; i--) { + final int docId = scoreDocs[i].doc; final Document d = indexReader.document(docId); docs.add(d); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index 07cd1903b2..b93d3b70b6 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -234,6 +234,9 @@ public class IndexManager implements Closeable { } } + // We found no cached Index Readers. Create a new one. To do this, we need to check + // if we have an Index Writer, and if so create a Reader based on the Index Writer. + // This will provide us a 'near real time' index reader. final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); if ( writerCount == null ) { final Directory directory = FSDirectory.open(absoluteFile); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index b78dfcddc8..12f4a73dc1 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -485,8 +485,7 @@ public class TestPersistentProvenanceRepository { assertTrue(newRecordSet.getMatchingEvents().isEmpty()); } - // TODO: Switch to 10,000. - @Test(timeout = 1000000) + @Test(timeout = 10000) public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration(); config.setMaxRecordLife(30, TimeUnit.SECONDS); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index 10026cf9fa..79f7d9f56b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -181,6 +181,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return records.isEmpty() ? null : records.get(0); } + @Override public ProvenanceEventRecord getEvent(final long id) { final List records = ringBuffer.getSelectedElements(new Filter() { @Override @@ -192,6 +193,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return records.isEmpty() ? null : records.get(0); } + @Override public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) { final ProvenanceEventRecord event = getEvent(id); if (event == null) { @@ -473,6 +475,20 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return new FlowFileLineage(result.getNodes(), result.getEdges()); } + @Override + public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) { + final ProvenanceEventRecord event = getEvent(eventId); + if (event == null) { + final String userId = user.getIdentity(); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections. emptySet(), 1, userId); + result.getResult().setError("Could not find event with ID " + eventId); + lineageSubmissionMap.put(result.getLineageIdentifier(), result); + return result; + } + + return submitLineageComputation(event.getFlowFileUuid(), user); + } + @Override public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) { return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null);