NIFI-2452: This closes #771. Ensure that we keep track of how many references we have to each lucene searcher and only close the underlying index reader if there are no references to the searcher. Also updated to prefer newer provenance events over older provenance events, and calculate FlowFile lineage based on an event id instead of a FlowFile UUID, as it's much more efficient

This commit is contained in:
Mark Payne 2016-08-02 09:56:05 -04:00 committed by joewitt
parent e9b87dd734
commit 16348b071d
13 changed files with 159 additions and 100 deletions

View File

@ -106,6 +106,29 @@ public interface ProvenanceRepository extends ProvenanceEventRepository {
*/
ComputeLineageSubmission submitLineageComputation(String flowFileUuid, NiFiUser user);
/**
* Submits a Lineage Computation to be completed and returns the
* AsynchronousLineageResult that indicates the status of the request and
* the results, if the computation is complete. If the given user does not
* have authorization to view one of the events in the lineage, a placeholder
* event will be used instead that provides none of the event details except
* for the identifier of the component that emitted the Provenance Event. It is
* necessary to include this node in the lineage view so that the lineage makes
* sense, rather than showing disconnected graphs when the user is not authorized
* for all components' provenance events.
*
* This method is preferred to {@link #submitLineageComputation(String, NiFiUser)} because
* it is much more efficient, but the former may be used if a particular Event ID is not
* available.
*
* @param eventId the numeric ID of the event that the lineage is for
* @param user the NiFi User to authorize events against
*
* @return a {@link ComputeLineageSubmission} object that can be used to
* check if the computing is complete and if so get the results
*/
ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user);
/**
* @param lineageIdentifier identifier of lineage to compute
* @param user the user who is retrieving the lineage submission

View File

@ -99,6 +99,11 @@ public class MockProvenanceRepository implements ProvenanceRepository {
throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation");
}
@Override
public ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user) {
throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation");
}
@Override
public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, NiFiUser user) {
throw new UnsupportedOperationException("MockProvenanceRepository does not support Lineage Computation");

View File

@ -203,16 +203,24 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode();
// Determine whether or not the node should be allowed to be in the cluster still, depending on its reason for disconnection.
if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT || disconnectionCode == DisconnectionCode.UNABLE_TO_COMMUNICATE) {
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously "
+ "disconnected due to " + disconnectionCode + ". Issuing reconnection request.");
switch (disconnectionCode) {
case LACK_OF_HEARTBEAT:
case UNABLE_TO_COMMUNICATE:
case NODE_SHUTDOWN:
case NOT_YET_CONNECTED:
case STARTUP_FAILURE: {
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously "
+ "disconnected due to " + disconnectionCode + ". Issuing reconnection request.");
clusterCoordinator.requestNodeConnect(nodeId, null);
} else {
// disconnected nodes should not heartbeat, so we need to issue a disconnection request.
logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request.");
clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
removeHeartbeat(nodeId);
clusterCoordinator.requestNodeConnect(nodeId, null);
}
default: {
// disconnected nodes should not heartbeat, so we need to issue a disconnection request.
logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request.");
clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE,
DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
removeHeartbeat(nodeId);
}
}
return;

View File

@ -16,7 +16,38 @@
*/
package org.apache.nifi.controller;
import com.sun.jersey.api.client.ClientHandlerException;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -183,10 +214,10 @@ import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
@ -208,37 +239,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNull;
import com.sun.jersey.api.client.ClientHandlerException;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider {
@ -303,7 +304,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final Integer remoteInputHttpPort;
private final Boolean isSiteToSiteSecure;
private ProcessGroup rootGroup;
private final AtomicReference<ProcessGroup> rootGroupRef = new AtomicReference<>();
private final List<Connectable> startConnectablesAfterInitialization;
private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
private final LeaderElectionManager leaderElectionManager;
@ -516,9 +517,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.snippetManager = new SnippetManager();
rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler,
final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler,
properties, encryptor, this, this.variableRegistry);
rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
rootGroupRef.set(rootGroup);
instanceId = ComponentIdGenerator.generateId().toString();
controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry);
@ -1197,6 +1200,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext);
}
public ProcessGroup getRootGroup() {
return rootGroupRef.get();
}
/**
* Verifies that no output port exists with the given id or name. If this
* does not hold true, throws an IllegalStateException
@ -1205,6 +1212,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws IllegalStateException port already exists
*/
private void verifyPortIdDoesNotExist(final String id) {
final ProcessGroup rootGroup = getRootGroup();
Port port = rootGroup.findOutputPort(id);
if (port != null) {
throw new IllegalStateException("An Input Port already exists with ID " + id);
@ -1220,12 +1228,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Group.
*/
public String getName() {
readLock.lock();
try {
return rootGroup.getName();
} finally {
readLock.unlock();
}
return getRootGroup().getName();
}
/**
@ -1235,12 +1238,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @param name of root group
*/
public void setName(final String name) {
readLock.lock();
try {
rootGroup.setName(name);
} finally {
readLock.unlock();
}
getRootGroup().setName(name);
}
/**
@ -1248,12 +1246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Root Group
*/
public String getComments() {
readLock.lock();
try {
return rootGroup.getComments();
} finally {
readLock.unlock();
}
return getRootGroup().getComments();
}
/**
@ -1263,12 +1256,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* the controller
*/
public void setComments(final String comments) {
readLock.lock();
try {
rootGroup.setComments(comments);
} finally {
readLock.unlock();
}
getRootGroup().setComments(comments);
}
/**
@ -1327,7 +1315,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// Trigger any processors' methods marked with @OnShutdown to be called
rootGroup.shutdown();
getRootGroup().shutdown();
stateManagerProvider.shutdown();
@ -1495,12 +1483,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @return the ID of the root group
*/
public String getRootGroupId() {
readLock.lock();
try {
return rootGroup.getIdentifier();
} finally {
readLock.unlock();
}
return getRootGroup().getIdentifier();
}
/**
@ -1519,14 +1502,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
writeLock.lock();
try {
rootGroup = group;
rootGroupRef.set(group);
for (final RemoteSiteListener listener : externalSiteListeners) {
listener.setRootGroup(rootGroup);
listener.setRootGroup(group);
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary()));
} finally {
writeLock.unlock();
}
@ -2198,14 +2180,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*/
public ProcessGroup getGroup(final String id) {
requireNonNull(id);
final ProcessGroup root;
readLock.lock();
try {
root = rootGroup;
} finally {
readLock.unlock();
}
final ProcessGroup root = getRootGroup();
final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
return root == null ? null : root.findProcessGroup(searchId);
}
@ -3458,7 +3433,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary()));
} finally {
writeLock.unlock();
}
@ -3876,7 +3851,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.connectionStatus = connectionStatus;
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary()));
} finally {
rwLock.writeLock().unlock();
}
@ -4018,9 +3993,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// is set to the root group and otherwise assume that the ID is that of a component.
final DataAuthorizable authorizable;
if (rootGroupId.equals(componentId)) {
authorizable = new DataAuthorizable(rootGroup);
authorizable = new DataAuthorizable(getRootGroup());
} else {
final Connectable connectable = rootGroup.findConnectable(componentId);
final Connectable connectable = getRootGroup().findConnectable(componentId);
if (connectable == null) {
throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");

View File

@ -98,6 +98,7 @@
<logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" />
<logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" />
<logger name="org.apache.curator.ConnectionState" level="OFF" />
<!-- Logger for managing logging statements for nifi clusters. -->
<logger name="org.apache.nifi.cluster" level="INFO"/>

View File

@ -464,8 +464,8 @@ public class ProvenanceResource extends ApplicationResource {
break;
case FLOWFILE:
// ensure the uuid has been specified
if (requestDto.getUuid() == null) {
throw new IllegalArgumentException("The flowfile uuid must be specified when the event type is FLOWFILE.");
if (requestDto.getUuid() == null && requestDto.getEventId() == null) {
throw new IllegalArgumentException("The flowfile uuid or event id must be specified when the event type is FLOWFILE.");
}
break;
}

View File

@ -1013,7 +1013,11 @@ public class ControllerFacade implements Authorizable {
// submit the event
if (LineageRequestType.FLOWFILE.equals(requestDto.getLineageRequestType())) {
// submit uuid
result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser());
if (requestDto.getEventId() == null) {
result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser());
} else {
result = provenanceRepository.submitLineageComputation(requestDto.getEventId(), NiFiUserUtils.getNiFiUser());
}
} else {
// submit event... (parents or children)
if (LineageRequestType.PARENTS.equals(requestDto.getLineageRequestType())) {

View File

@ -1293,7 +1293,8 @@ nf.ng.ProvenanceLineage = function () {
var lineageRequest = {
lineageRequestType: 'FLOWFILE',
uuid: flowFileUuid,
clusterNodeId: clusterNodeId
clusterNodeId: clusterNodeId,
eventId: eventId
};
// update the progress bar value

View File

@ -35,6 +35,7 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.expiration.ExpirationAction;
import org.apache.nifi.provenance.expiration.FileRemovalAction;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.lineage.FlowFileLineage;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageComputationType;
@ -2169,6 +2170,28 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
return new FlowFileLineage(result.getNodes(), result.getEdges());
}
@Override
public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) {
final ProvenanceEventRecord event;
try {
event = getEvent(eventId);
} catch (final Exception e) {
logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e);
final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity());
result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information.");
return result;
}
if (event == null) {
final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity());
result.getResult().setError("Could not find Provenance Event with ID " + eventId);
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
return result;
}
return submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user, LineageComputationType.FLOWFILE_LINEAGE, eventId, event.getLineageStartDate(), Long.MAX_VALUE);
}
@Override
public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) {
return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);

View File

@ -55,11 +55,12 @@ class DocsReader {
}
final long start = System.nanoTime();
final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
final int numDocs = Math.min(scoreDocs.length, maxResults);
final List<Document> docs = new ArrayList<>(numDocs);
for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
final int docId = scoreDoc.doc;
for (int i = numDocs - 1; i >= 0; i--) {
final int docId = scoreDocs[i].doc;
final Document d = indexReader.document(docId);
docs.add(d);
}

View File

@ -234,6 +234,9 @@ public class IndexManager implements Closeable {
}
}
// We found no cached Index Readers. Create a new one. To do this, we need to check
// if we have an Index Writer, and if so create a Reader based on the Index Writer.
// This will provide us a 'near real time' index reader.
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount == null ) {
final Directory directory = FSDirectory.open(absoluteFile);

View File

@ -485,8 +485,7 @@ public class TestPersistentProvenanceRepository {
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
}
// TODO: Switch to 10,000.
@Test(timeout = 1000000)
@Test(timeout = 10000)
public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException {
final RepositoryConfiguration config = createConfiguration();
config.setMaxRecordLife(30, TimeUnit.SECONDS);

View File

@ -181,6 +181,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
return records.isEmpty() ? null : records.get(0);
}
@Override
public ProvenanceEventRecord getEvent(final long id) {
final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new Filter<ProvenanceEventRecord>() {
@Override
@ -192,6 +193,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
return records.isEmpty() ? null : records.get(0);
}
@Override
public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) {
final ProvenanceEventRecord event = getEvent(id);
if (event == null) {
@ -473,6 +475,20 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
return new FlowFileLineage(result.getNodes(), result.getEdges());
}
@Override
public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) {
final ProvenanceEventRecord event = getEvent(eventId);
if (event == null) {
final String userId = user.getIdentity();
final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, userId);
result.getResult().setError("Could not find event with ID " + eventId);
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
return result;
}
return submitLineageComputation(event.getFlowFileUuid(), user);
}
@Override
public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) {
return submitLineageComputation(Collections.singleton(flowFileUuid), user, LineageComputationType.FLOWFILE_LINEAGE, null);