diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java index 7ac1a65e80..6a5954a16f 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java @@ -106,6 +106,29 @@ public interface ProvenanceRepository extends ProvenanceEventRepository { */ ComputeLineageSubmission submitLineageComputation(String flowFileUuid, NiFiUser user); + /** + * Submits a Lineage Computation to be completed and returns the + * AsynchronousLineageResult that indicates the status of the request and + * the results, if the computation is complete. If the given user does not + * have authorization to view one of the events in the lineage, a placeholder + * event will be used instead that provides none of the event details except + * for the identifier of the component that emitted the Provenance Event. It is + * necessary to include this node in the lineage view so that the lineage makes + * sense, rather than showing disconnected graphs when the user is not authorized + * for all components' provenance events. + * + * This method is preferred to {@link #submitLineageComputation(String, NiFiUser)} because + * it is much more efficient, but the former may be used if a particular Event ID is not + * available. + * + * @param eventId the numeric ID of the event that the lineage is for + * @param user the NiFi User to authorize events against + * + * @return a {@link ComputeLineageSubmission} object that can be used to + * check if the computing is complete and if so get the results + */ + ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user); + /** * @param lineageIdentifier identifier of lineage to compute * @param user the user who is retrieving the lineage submission diff --git a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java index a13a338dcc..9bc5f0ecd7 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java +++ b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java @@ -99,6 +99,11 @@ public class MockProvenanceRepository implements ProvenanceRepository { throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation"); } + @Override + public ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user) { + throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation"); + } + @Override public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, NiFiUser user) { throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 0bd84d6d42..22c9ab5042 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -203,16 +203,24 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode(); // Determine whether or not the node should be allowed to be in the cluster still, depending on its reason for disconnection. - if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT || disconnectionCode == DisconnectionCode.UNABLE_TO_COMMUNICATE) { - clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously " - + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); + switch (disconnectionCode) { + case LACK_OF_HEARTBEAT: + case UNABLE_TO_COMMUNICATE: + case NODE_SHUTDOWN: + case NOT_YET_CONNECTED: + case STARTUP_FAILURE: { + clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously " + + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); - clusterCoordinator.requestNodeConnect(nodeId, null); - } else { - // disconnected nodes should not heartbeat, so we need to issue a disconnection request. - logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request."); - clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString()); - removeHeartbeat(nodeId); + clusterCoordinator.requestNodeConnect(nodeId, null); + } + default: { + // disconnected nodes should not heartbeat, so we need to issue a disconnection request. + logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request."); + clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, + DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString()); + removeHeartbeat(nodeId); + } } return; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index f6973a5448..b7b32ad76e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,7 +16,38 @@ */ package org.apache.nifi.controller; -import com.sun.jersey.api.client.ClientHandlerException; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.SSLContext; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -183,10 +214,10 @@ import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; -import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -208,37 +239,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; -import static java.util.Objects.requireNonNull; +import com.sun.jersey.api.client.ClientHandlerException; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { @@ -303,7 +304,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final Integer remoteInputHttpPort; private final Boolean isSiteToSiteSecure; - private ProcessGroup rootGroup; + private final AtomicReference rootGroupRef = new AtomicReference<>(); private final List startConnectablesAfterInitialization; private final List startRemoteGroupPortsAfterInitialization; private final LeaderElectionManager leaderElectionManager; @@ -516,9 +517,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.snippetManager = new SnippetManager(); - rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, + + final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, properties, encryptor, this, this.variableRegistry); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); + rootGroupRef.set(rootGroup); instanceId = ComponentIdGenerator.generateId().toString(); controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry); @@ -1197,6 +1200,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext); } + public ProcessGroup getRootGroup() { + return rootGroupRef.get(); + } + /** * Verifies that no output port exists with the given id or name. If this * does not hold true, throws an IllegalStateException @@ -1205,6 +1212,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws IllegalStateException port already exists */ private void verifyPortIdDoesNotExist(final String id) { + final ProcessGroup rootGroup = getRootGroup(); Port port = rootGroup.findOutputPort(id); if (port != null) { throw new IllegalStateException("An Input Port already exists with ID " + id); @@ -1220,12 +1228,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Group. */ public String getName() { - readLock.lock(); - try { - return rootGroup.getName(); - } finally { - readLock.unlock(); - } + return getRootGroup().getName(); } /** @@ -1235,12 +1238,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @param name of root group */ public void setName(final String name) { - readLock.lock(); - try { - rootGroup.setName(name); - } finally { - readLock.unlock(); - } + getRootGroup().setName(name); } /** @@ -1248,12 +1246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Root Group */ public String getComments() { - readLock.lock(); - try { - return rootGroup.getComments(); - } finally { - readLock.unlock(); - } + return getRootGroup().getComments(); } /** @@ -1263,12 +1256,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * the controller */ public void setComments(final String comments) { - readLock.lock(); - try { - rootGroup.setComments(comments); - } finally { - readLock.unlock(); - } + getRootGroup().setComments(comments); } /** @@ -1327,7 +1315,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Trigger any processors' methods marked with @OnShutdown to be called - rootGroup.shutdown(); + getRootGroup().shutdown(); stateManagerProvider.shutdown(); @@ -1495,12 +1483,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @return the ID of the root group */ public String getRootGroupId() { - readLock.lock(); - try { - return rootGroup.getIdentifier(); - } finally { - readLock.unlock(); - } + return getRootGroup().getIdentifier(); } /** @@ -1519,14 +1502,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R writeLock.lock(); try { - rootGroup = group; - + rootGroupRef.set(group); for (final RemoteSiteListener listener : externalSiteListeners) { - listener.setRootGroup(rootGroup); + listener.setRootGroup(group); } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); + this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary())); } finally { writeLock.unlock(); } @@ -2198,14 +2180,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ public ProcessGroup getGroup(final String id) { requireNonNull(id); - final ProcessGroup root; - readLock.lock(); - try { - root = rootGroup; - } finally { - readLock.unlock(); - } - + final ProcessGroup root = getRootGroup(); final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; return root == null ? null : root.findProcessGroup(searchId); } @@ -3458,7 +3433,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); + this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary())); } finally { writeLock.unlock(); } @@ -3876,7 +3851,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.connectionStatus = connectionStatus; // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); + this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary())); } finally { rwLock.writeLock().unlock(); } @@ -4018,9 +3993,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // is set to the root group and otherwise assume that the ID is that of a component. final DataAuthorizable authorizable; if (rootGroupId.equals(componentId)) { - authorizable = new DataAuthorizable(rootGroup); + authorizable = new DataAuthorizable(getRootGroup()); } else { - final Connectable connectable = rootGroup.findConnectable(componentId); + final Connectable connectable = getRootGroup().findConnectable(componentId); if (connectable == null) { throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow."); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml index f67a6cd567..871265e699 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml @@ -98,6 +98,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java index 24e303de3b..e618e8ad54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java @@ -464,8 +464,8 @@ public class ProvenanceResource extends ApplicationResource { break; case FLOWFILE: // ensure the uuid has been specified - if (requestDto.getUuid() == null) { - throw new IllegalArgumentException("The flowfile uuid must be specified when the event type is FLOWFILE."); + if (requestDto.getUuid() == null && requestDto.getEventId() == null) { + throw new IllegalArgumentException("The flowfile uuid or event id must be specified when the event type is FLOWFILE."); } break; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 2152c76969..bed66abeb3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -1013,7 +1013,11 @@ public class ControllerFacade implements Authorizable { // submit the event if (LineageRequestType.FLOWFILE.equals(requestDto.getLineageRequestType())) { // submit uuid - result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser()); + if (requestDto.getEventId() == null) { + result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser()); + } else { + result = provenanceRepository.submitLineageComputation(requestDto.getEventId(), NiFiUserUtils.getNiFiUser()); + } } else { // submit event... (parents or children) if (LineageRequestType.PARENTS.equals(requestDto.getLineageRequestType())) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js index 8287bff0ba..6b038c7342 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-lineage.js @@ -1293,7 +1293,8 @@ nf.ng.ProvenanceLineage = function () { var lineageRequest = { lineageRequestType: 'FLOWFILE', uuid: flowFileUuid, - clusterNodeId: clusterNodeId + clusterNodeId: clusterNodeId, + eventId: eventId }; // update the progress bar value diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 87b617f53c..f8bb667c00 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -35,6 +35,7 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.expiration.ExpirationAction; import org.apache.nifi.provenance.expiration.FileRemovalAction; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; import org.apache.nifi.provenance.lineage.FlowFileLineage; import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageComputationType; @@ -2169,6 +2170,28 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return new FlowFileLineage(result.getNodes(), result.getEdges()); } + @Override + public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) { + final ProvenanceEventRecord event; + try { + event = getEvent(eventId); + } catch (final Exception e) { + logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections. emptySet(), 1, user.getIdentity()); + result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information."); + return result; + } + + if (event == null) { + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections. emptySet(), 1, user.getIdentity()); + result.getResult().setError("Could not find Provenance Event with ID " + eventId); + lineageSubmissionMap.put(result.getLineageIdentifier(), result); + return result; + } + + return submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user, LineageComputationType.FLOWFILE_LINEAGE, eventId, event.getLineageStartDate(), Long.MAX_VALUE); + } + @Override public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) { return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index e448f277f4..ce62152506 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -55,11 +55,12 @@ class DocsReader { } final long start = System.nanoTime(); - final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); + final ScoreDoc[] scoreDocs = topDocs.scoreDocs; + final int numDocs = Math.min(scoreDocs.length, maxResults); final List docs = new ArrayList<>(numDocs); - for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { - final int docId = scoreDoc.doc; + for (int i = numDocs - 1; i >= 0; i--) { + final int docId = scoreDocs[i].doc; final Document d = indexReader.document(docId); docs.add(d); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index 07cd1903b2..b93d3b70b6 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -234,6 +234,9 @@ public class IndexManager implements Closeable { } } + // We found no cached Index Readers. Create a new one. To do this, we need to check + // if we have an Index Writer, and if so create a Reader based on the Index Writer. + // This will provide us a 'near real time' index reader. final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); if ( writerCount == null ) { final Directory directory = FSDirectory.open(absoluteFile); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index b78dfcddc8..12f4a73dc1 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -485,8 +485,7 @@ public class TestPersistentProvenanceRepository { assertTrue(newRecordSet.getMatchingEvents().isEmpty()); } - // TODO: Switch to 10,000. - @Test(timeout = 1000000) + @Test(timeout = 10000) public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration(); config.setMaxRecordLife(30, TimeUnit.SECONDS); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index 10026cf9fa..79f7d9f56b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -181,6 +181,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return records.isEmpty() ? null : records.get(0); } + @Override public ProvenanceEventRecord getEvent(final long id) { final List records = ringBuffer.getSelectedElements(new Filter() { @Override @@ -192,6 +193,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return records.isEmpty() ? null : records.get(0); } + @Override public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) { final ProvenanceEventRecord event = getEvent(id); if (event == null) { @@ -473,6 +475,20 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return new FlowFileLineage(result.getNodes(), result.getEdges()); } + @Override + public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) { + final ProvenanceEventRecord event = getEvent(eventId); + if (event == null) { + final String userId = user.getIdentity(); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections. emptySet(), 1, userId); + result.getResult().setError("Could not find event with ID " + eventId); + lineageSubmissionMap.put(result.getLineageIdentifier(), result); + return result; + } + + return submitLineageComputation(event.getFlowFileUuid(), user); + } + @Override public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) { return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null);