NIFI-1135:

- Adding additional parameters to be able to limit the size of the provenance response. Specifically, whether the events should be summarized and whether events should be returned incrementally before the query has completed.
- Ensuring the cluster node address is included in provenance events returned.
- Ensuring there is a cluster coordinator before attempting to get the cluster node address.
- Removing exponential back off between provenance requests.
- Ensuring the content viewer url is retrieve before initializing the provenance table.

This closes #1413.
This commit is contained in:
Matt Gilman 2017-01-13 09:25:10 -05:00 committed by Mark Payne
parent 82cf0c6fa8
commit e925b18fe6
9 changed files with 434 additions and 390 deletions

View File

@ -38,6 +38,9 @@ public class ProvenanceRequestDTO {
private String maximumFileSize; private String maximumFileSize;
private Integer maxResults; private Integer maxResults;
private Boolean summarize;
private Boolean incrementalResults;
/** /**
* @return the search terms to use for this search * @return the search terms to use for this search
*/ */
@ -139,4 +142,34 @@ public class ProvenanceRequestDTO {
public void setClusterNodeId(String clusterNodeId) { public void setClusterNodeId(String clusterNodeId) {
this.clusterNodeId = clusterNodeId; this.clusterNodeId = clusterNodeId;
} }
/**
* @return whether or not incremental results are returned. If false, provenance events
* are only returned once the query completes. This property is true by default.
*/
@ApiModelProperty(
value = "Whether or not incremental results are returned. If false, provenance events"
+ " are only returned once the query completes. This property is true by default."
)
public Boolean getIncrementalResults() {
return incrementalResults;
}
public void setIncrementalResults(Boolean incrementalResults) {
this.incrementalResults = incrementalResults;
}
/**
* @return whether or not to summarize provenance events returned. This property is false by default.
*/
@ApiModelProperty(
value = "Whether or not to summarize provenance events returned. This property is false by default."
)
public Boolean getSummarize() {
return summarize;
}
public void setSummarize(Boolean summarize) {
this.summarize = summarize;
}
} }

View File

@ -206,9 +206,11 @@ public interface NiFiServiceFacade {
* Retrieves provenance. * Retrieves provenance.
* *
* @param queryId identifier * @param queryId identifier
* @param summarize whether to summarize the event dtos
* @param incrementalResults whether to return any events if the search has not finished
* @return result * @return result
*/ */
ProvenanceDTO getProvenance(String queryId); ProvenanceDTO getProvenance(String queryId, Boolean summarize, Boolean incrementalResults);
/** /**
* Deletes provenance. * Deletes provenance.

View File

@ -2148,8 +2148,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
} }
@Override @Override
public ProvenanceDTO getProvenance(final String queryId) { public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) {
return controllerFacade.getProvenanceQuery(queryId); return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults);
} }
@Override @Override

View File

@ -22,7 +22,9 @@ import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization; import com.wordnik.swagger.annotations.Authorization;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.DownloadableContent;
@ -286,6 +288,13 @@ public class ProvenanceEventResource extends ApplicationResource {
final ProvenanceEventDTO event = serviceFacade.getProvenanceEvent(id.getLong()); final ProvenanceEventDTO event = serviceFacade.getProvenanceEvent(id.getLong());
event.setClusterNodeId(clusterNodeId); event.setClusterNodeId(clusterNodeId);
// populate the cluster node address
final ClusterCoordinator coordinator = getClusterCoordinator();
if (coordinator != null) {
final NodeIdentifier nodeId = coordinator.getNodeIdentifier(clusterNodeId);
event.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
}
// create a response entity // create a response entity
final ProvenanceEventEntity entity = new ProvenanceEventEntity(); final ProvenanceEventEntity entity = new ProvenanceEventEntity();
entity.setProvenanceEvent(event); entity.setProvenanceEvent(event);

View File

@ -46,6 +46,7 @@ import org.apache.nifi.web.api.entity.ProvenanceOptionsEntity;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE; import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod; import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST; import javax.ws.rs.POST;
@ -294,6 +295,17 @@ public class ProvenanceResource extends ApplicationResource {
required = false required = false
) )
@QueryParam("clusterNodeId") final String clusterNodeId, @QueryParam("clusterNodeId") final String clusterNodeId,
@ApiParam(
value = "Whether or not incremental results are returned. If false, provenance events"
+ " are only returned once the query completes. This property is true by default.",
required = false
)
@QueryParam("summarize") @DefaultValue(value = "false") final Boolean summarize,
@ApiParam(
value = "Whether or not to summarize provenance events returned. This property is false by default.",
required = false
)
@QueryParam("incrementalResults") @DefaultValue(value = "true") final Boolean incrementalResults,
@ApiParam( @ApiParam(
value = "The id of the provenance query.", value = "The id of the provenance query.",
required = true required = true
@ -314,7 +326,7 @@ public class ProvenanceResource extends ApplicationResource {
} }
// get the provenance // get the provenance
final ProvenanceDTO dto = serviceFacade.getProvenance(id); final ProvenanceDTO dto = serviceFacade.getProvenance(id, summarize, incrementalResults);
dto.getRequest().setClusterNodeId(clusterNodeId); dto.getRequest().setClusterNodeId(clusterNodeId);
populateRemainingProvenanceContent(dto); populateRemainingProvenanceContent(dto);

View File

@ -972,7 +972,7 @@ public class ControllerFacade implements Authorizable {
final QuerySubmission querySubmission = provenanceRepository.submitQuery(query, NiFiUserUtils.getNiFiUser()); final QuerySubmission querySubmission = provenanceRepository.submitQuery(query, NiFiUserUtils.getNiFiUser());
// return the query with the results populated at this point // return the query with the results populated at this point
return getProvenanceQuery(querySubmission.getQueryIdentifier()); return getProvenanceQuery(querySubmission.getQueryIdentifier(), requestDto.getSummarize(), requestDto.getIncrementalResults());
} }
/** /**
@ -981,7 +981,7 @@ public class ControllerFacade implements Authorizable {
* @param provenanceId id * @param provenanceId id
* @return the results of a provenance query * @return the results of a provenance query
*/ */
public ProvenanceDTO getProvenanceQuery(String provenanceId) { public ProvenanceDTO getProvenanceQuery(String provenanceId, Boolean summarize, Boolean incrementalResults) {
try { try {
// get the query to the provenance repository // get the query to the provenance repository
final ProvenanceRepository provenanceRepository = flowController.getProvenanceRepository(); final ProvenanceRepository provenanceRepository = flowController.getProvenanceRepository();
@ -1027,11 +1027,14 @@ public class ControllerFacade implements Authorizable {
provenanceDto.setPercentCompleted(queryResult.getPercentComplete()); provenanceDto.setPercentCompleted(queryResult.getPercentComplete());
// convert each event // convert each event
final List<ProvenanceEventDTO> events = new ArrayList<>(); final boolean includeResults = incrementalResults == null || Boolean.TRUE.equals(incrementalResults);
for (final ProvenanceEventRecord record : queryResult.getMatchingEvents()) { if (includeResults || queryResult.isFinished()) {
events.add(createProvenanceEventDto(record)); final List<ProvenanceEventDTO> events = new ArrayList<>();
for (final ProvenanceEventRecord record : queryResult.getMatchingEvents()) {
events.add(createProvenanceEventDto(record, Boolean.TRUE.equals(summarize)));
}
resultsDto.setProvenanceEvents(events);
} }
resultsDto.setProvenanceEvents(events);
if (requestDto.getMaxResults() != null && queryResult.getTotalHitCount() >= requestDto.getMaxResults()) { if (requestDto.getMaxResults() != null && queryResult.getTotalHitCount() >= requestDto.getMaxResults()) {
resultsDto.setTotalCount(requestDto.getMaxResults().longValue()); resultsDto.setTotalCount(requestDto.getMaxResults().longValue());
@ -1227,7 +1230,7 @@ public class ControllerFacade implements Authorizable {
final ProvenanceEventRecord event = flowController.replayFlowFile(originalEvent, user); final ProvenanceEventRecord event = flowController.replayFlowFile(originalEvent, user);
// convert the event record // convert the event record
return createProvenanceEventDto(event); return createProvenanceEventDto(event, false);
} catch (final IOException ioe) { } catch (final IOException ioe) {
throw new NiFiCoreException("An error occurred while getting the specified event.", ioe); throw new NiFiCoreException("An error occurred while getting the specified event.", ioe);
} }
@ -1313,7 +1316,7 @@ public class ControllerFacade implements Authorizable {
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes); dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes);
// convert the event // convert the event
return createProvenanceEventDto(event); return createProvenanceEventDto(event, false);
} catch (final IOException ioe) { } catch (final IOException ioe) {
throw new NiFiCoreException("An error occurred while getting the specified event.", ioe); throw new NiFiCoreException("An error occurred while getting the specified event.", ioe);
} }
@ -1325,117 +1328,121 @@ public class ControllerFacade implements Authorizable {
* @param event event * @param event event
* @return event * @return event
*/ */
private ProvenanceEventDTO createProvenanceEventDto(final ProvenanceEventRecord event) { private ProvenanceEventDTO createProvenanceEventDto(final ProvenanceEventRecord event, final boolean summarize) {
// convert the attributes
final Comparator<AttributeDTO> attributeComparator = new Comparator<AttributeDTO>() {
@Override
public int compare(AttributeDTO a1, AttributeDTO a2) {
return Collator.getInstance(Locale.US).compare(a1.getName(), a2.getName());
}
};
final SortedSet<AttributeDTO> attributes = new TreeSet<>(attributeComparator);
final Map<String, String> updatedAttrs = event.getUpdatedAttributes();
final Map<String, String> previousAttrs = event.getPreviousAttributes();
// add previous attributes that haven't been modified.
for (final Map.Entry<String, String> entry : previousAttrs.entrySet()) {
// don't add any attributes that have been updated; we will do that next
if (updatedAttrs.containsKey(entry.getKey())) {
continue;
}
final AttributeDTO attribute = new AttributeDTO();
attribute.setName(entry.getKey());
attribute.setValue(entry.getValue());
attribute.setPreviousValue(entry.getValue());
attributes.add(attribute);
}
// Add all of the update attributes
for (final Map.Entry<String, String> entry : updatedAttrs.entrySet()) {
final AttributeDTO attribute = new AttributeDTO();
attribute.setName(entry.getKey());
attribute.setValue(entry.getValue());
attribute.setPreviousValue(previousAttrs.get(entry.getKey()));
attributes.add(attribute);
}
// build the event dto
final ProvenanceEventDTO dto = new ProvenanceEventDTO(); final ProvenanceEventDTO dto = new ProvenanceEventDTO();
dto.setId(String.valueOf(event.getEventId())); dto.setId(String.valueOf(event.getEventId()));
dto.setAlternateIdentifierUri(event.getAlternateIdentifierUri());
dto.setAttributes(attributes);
dto.setTransitUri(event.getTransitUri());
dto.setEventId(event.getEventId()); dto.setEventId(event.getEventId());
dto.setEventTime(new Date(event.getEventTime())); dto.setEventTime(new Date(event.getEventTime()));
dto.setEventType(event.getEventType().name()); dto.setEventType(event.getEventType().name());
dto.setFlowFileUuid(event.getFlowFileUuid());
dto.setFileSize(FormatUtils.formatDataSize(event.getFileSize())); dto.setFileSize(FormatUtils.formatDataSize(event.getFileSize()));
dto.setFileSizeBytes(event.getFileSize()); dto.setFileSizeBytes(event.getFileSize());
dto.setComponentId(event.getComponentId()); dto.setComponentId(event.getComponentId());
dto.setComponentType(event.getComponentType()); dto.setComponentType(event.getComponentType());
dto.setSourceSystemFlowFileId(event.getSourceSystemFlowFileIdentifier());
dto.setFlowFileUuid(event.getFlowFileUuid());
dto.setRelationship(event.getRelationship());
dto.setDetails(event.getDetails());
final ContentAvailability contentAvailability = flowController.getContentAvailability(event);
// content
dto.setContentEqual(contentAvailability.isContentSame());
dto.setInputContentAvailable(contentAvailability.isInputAvailable());
dto.setInputContentClaimSection(event.getPreviousContentClaimSection());
dto.setInputContentClaimContainer(event.getPreviousContentClaimContainer());
dto.setInputContentClaimIdentifier(event.getPreviousContentClaimIdentifier());
dto.setInputContentClaimOffset(event.getPreviousContentClaimOffset());
dto.setInputContentClaimFileSizeBytes(event.getPreviousFileSize());
dto.setOutputContentAvailable(contentAvailability.isOutputAvailable());
dto.setOutputContentClaimSection(event.getContentClaimSection());
dto.setOutputContentClaimContainer(event.getContentClaimContainer());
dto.setOutputContentClaimIdentifier(event.getContentClaimIdentifier());
dto.setOutputContentClaimOffset(event.getContentClaimOffset());
dto.setOutputContentClaimFileSize(FormatUtils.formatDataSize(event.getFileSize()));
dto.setOutputContentClaimFileSizeBytes(event.getFileSize());
// format the previous file sizes if possible
if (event.getPreviousFileSize() != null) {
dto.setInputContentClaimFileSize(FormatUtils.formatDataSize(event.getPreviousFileSize()));
}
// determine if authorized for event replay
final AuthorizationResult replayAuthorized = checkAuthorizationForReplay(event);
// replay
dto.setReplayAvailable(contentAvailability.isReplayable() && Result.Approved.equals(replayAuthorized.getResult()));
dto.setReplayExplanation(contentAvailability.isReplayable()
&& !Result.Approved.equals(replayAuthorized.getResult()) ? replayAuthorized.getExplanation() : contentAvailability.getReasonNotReplayable());
dto.setSourceConnectionIdentifier(event.getSourceQueueIdentifier());
// sets the component details if it can find the component still in the flow // sets the component details if it can find the component still in the flow
setComponentDetails(dto); setComponentDetails(dto);
// event duration // only include all details if not summarizing
if (event.getEventDuration() >= 0) { if (!summarize) {
dto.setEventDuration(event.getEventDuration()); // convert the attributes
final Comparator<AttributeDTO> attributeComparator = new Comparator<AttributeDTO>() {
@Override
public int compare(AttributeDTO a1, AttributeDTO a2) {
return Collator.getInstance(Locale.US).compare(a1.getName(), a2.getName());
}
};
final SortedSet<AttributeDTO> attributes = new TreeSet<>(attributeComparator);
final Map<String, String> updatedAttrs = event.getUpdatedAttributes();
final Map<String, String> previousAttrs = event.getPreviousAttributes();
// add previous attributes that haven't been modified.
for (final Map.Entry<String, String> entry : previousAttrs.entrySet()) {
// don't add any attributes that have been updated; we will do that next
if (updatedAttrs.containsKey(entry.getKey())) {
continue;
}
final AttributeDTO attribute = new AttributeDTO();
attribute.setName(entry.getKey());
attribute.setValue(entry.getValue());
attribute.setPreviousValue(entry.getValue());
attributes.add(attribute);
}
// Add all of the update attributes
for (final Map.Entry<String, String> entry : updatedAttrs.entrySet()) {
final AttributeDTO attribute = new AttributeDTO();
attribute.setName(entry.getKey());
attribute.setValue(entry.getValue());
attribute.setPreviousValue(previousAttrs.get(entry.getKey()));
attributes.add(attribute);
}
// additional event details
dto.setAlternateIdentifierUri(event.getAlternateIdentifierUri());
dto.setAttributes(attributes);
dto.setTransitUri(event.getTransitUri());
dto.setSourceSystemFlowFileId(event.getSourceSystemFlowFileIdentifier());
dto.setRelationship(event.getRelationship());
dto.setDetails(event.getDetails());
final ContentAvailability contentAvailability = flowController.getContentAvailability(event);
// content
dto.setContentEqual(contentAvailability.isContentSame());
dto.setInputContentAvailable(contentAvailability.isInputAvailable());
dto.setInputContentClaimSection(event.getPreviousContentClaimSection());
dto.setInputContentClaimContainer(event.getPreviousContentClaimContainer());
dto.setInputContentClaimIdentifier(event.getPreviousContentClaimIdentifier());
dto.setInputContentClaimOffset(event.getPreviousContentClaimOffset());
dto.setInputContentClaimFileSizeBytes(event.getPreviousFileSize());
dto.setOutputContentAvailable(contentAvailability.isOutputAvailable());
dto.setOutputContentClaimSection(event.getContentClaimSection());
dto.setOutputContentClaimContainer(event.getContentClaimContainer());
dto.setOutputContentClaimIdentifier(event.getContentClaimIdentifier());
dto.setOutputContentClaimOffset(event.getContentClaimOffset());
dto.setOutputContentClaimFileSize(FormatUtils.formatDataSize(event.getFileSize()));
dto.setOutputContentClaimFileSizeBytes(event.getFileSize());
// format the previous file sizes if possible
if (event.getPreviousFileSize() != null) {
dto.setInputContentClaimFileSize(FormatUtils.formatDataSize(event.getPreviousFileSize()));
}
// determine if authorized for event replay
final AuthorizationResult replayAuthorized = checkAuthorizationForReplay(event);
// replay
dto.setReplayAvailable(contentAvailability.isReplayable() && Result.Approved.equals(replayAuthorized.getResult()));
dto.setReplayExplanation(contentAvailability.isReplayable()
&& !Result.Approved.equals(replayAuthorized.getResult()) ? replayAuthorized.getExplanation() : contentAvailability.getReasonNotReplayable());
dto.setSourceConnectionIdentifier(event.getSourceQueueIdentifier());
// event duration
if (event.getEventDuration() >= 0) {
dto.setEventDuration(event.getEventDuration());
}
// lineage duration
if (event.getLineageStartDate() > 0) {
final long lineageDuration = event.getEventTime() - event.getLineageStartDate();
dto.setLineageDuration(lineageDuration);
}
// parent uuids
final List<String> parentUuids = new ArrayList<>(event.getParentUuids());
Collections.sort(parentUuids, Collator.getInstance(Locale.US));
dto.setParentUuids(parentUuids);
// child uuids
final List<String> childUuids = new ArrayList<>(event.getChildUuids());
Collections.sort(childUuids, Collator.getInstance(Locale.US));
dto.setChildUuids(childUuids);
} }
// lineage duration
if (event.getLineageStartDate() > 0) {
final long lineageDuration = event.getEventTime() - event.getLineageStartDate();
dto.setLineageDuration(lineageDuration);
}
// parent uuids
final List<String> parentUuids = new ArrayList<>(event.getParentUuids());
Collections.sort(parentUuids, Collator.getInstance(Locale.US));
dto.setParentUuids(parentUuids);
// child uuids
final List<String> childUuids = new ArrayList<>(event.getChildUuids());
Collections.sort(childUuids, Collator.getInstance(Locale.US));
dto.setChildUuids(childUuids);
return dto; return dto;
} }

View File

@ -26,8 +26,7 @@ nf.ng.ProvenanceLineage = function () {
var config = { var config = {
sliderTickCount: 75, sliderTickCount: 75,
urls: { urls: {
lineage: '../nifi-api/provenance/lineage', lineage: '../nifi-api/provenance/lineage'
events: '../nifi-api/provenance-events/'
} }
}; };
@ -90,41 +89,6 @@ nf.ng.ProvenanceLineage = function () {
}); });
}; };
/**
* Shows the details for the specified event.
*
* @param {string} eventId
* @param {string} clusterNodeId The id of the node in the cluster where this event/flowfile originated
*/
var showEventDetails = function (eventId, clusterNodeId, provenanceTableCtrl) {
getEventDetails(eventId, clusterNodeId).done(function (response) {
provenanceTableCtrl.showEventDetails(response.provenanceEvent);
});
};
/**
* Gets the details for the specified event.
*
* @param {string} eventId
* @param {string} clusterNodeId The id of the node in the cluster where this event/flowfile originated
*/
var getEventDetails = function (eventId, clusterNodeId) {
var url;
if (nf.Common.isDefinedAndNotNull(clusterNodeId)) {
url = config.urls.events + encodeURIComponent(eventId) + '?' + $.param({
clusterNodeId: clusterNodeId
});
} else {
url = config.urls.events + encodeURIComponent(eventId);
}
return $.ajax({
type: 'GET',
url: url,
dataType: 'json'
}).fail(nf.Common.handleAjaxError);
};
/** /**
* Submits the specified lineage request. * Submits the specified lineage request.
* *
@ -792,7 +756,7 @@ nf.ng.ProvenanceLineage = function () {
'class': 'lineage-view-event', 'class': 'lineage-view-event',
'text': 'View details', 'text': 'View details',
'click': function () { 'click': function () {
showEventDetails(d.id, clusterNodeId, provenanceTableCtrl); provenanceTableCtrl.showEventDetails(d.id, clusterNodeId);
} }
}]; }];
@ -852,17 +816,17 @@ nf.ng.ProvenanceLineage = function () {
}; };
// polls for the event lineage // polls for the event lineage
var pollLineage = function (nextDelay) { var pollLineage = function () {
getLineage(lineage).done(function (response) { getLineage(lineage).done(function (response) {
lineage = response.lineage; lineage = response.lineage;
// process the lineage, if its not done computing wait delay seconds before checking again // process the lineage
processLineage(nextDelay); processLineage();
}).fail(closeDialog); }).fail(closeDialog);
}; };
// processes the event lineage // processes the event lineage
var processLineage = function (delay) { var processLineage = function () {
// if the request was cancelled just ignore the current response // if the request was cancelled just ignore the current response
if (cancelled === true) { if (cancelled === true) {
closeDialog(); closeDialog();
@ -907,13 +871,9 @@ nf.ng.ProvenanceLineage = function () {
// clear the timer since we've been invoked // clear the timer since we've been invoked
lineageTimer = null; lineageTimer = null;
// calculate the next delay (back off)
var backoff = delay * 2;
var nextDelay = backoff > provenanceTableCtrl.MAX_DELAY ? provenanceTableCtrl.MAX_DELAY : backoff;
// for the lineage // for the lineage
pollLineage(nextDelay); pollLineage();
}, delay * 1000); }, 2000);
} }
}; };
@ -934,7 +894,7 @@ nf.ng.ProvenanceLineage = function () {
// collapses the lineage for the specified event in the specified direction // collapses the lineage for the specified event in the specified direction
var collapseLineage = function (eventId, provenanceTableCtrl) { var collapseLineage = function (eventId, provenanceTableCtrl) {
// get the event in question and collapse in the appropriate direction // get the event in question and collapse in the appropriate direction
getEventDetails(eventId, clusterNodeId).done(function (response) { provenanceTableCtrl.getEventDetails(eventId, clusterNodeId).done(function (response) {
var provenanceEvent = response.provenanceEvent; var provenanceEvent = response.provenanceEvent;
var eventUuid = provenanceEvent.flowFileUuid; var eventUuid = provenanceEvent.flowFileUuid;
var eventUuids = d3.set(provenanceEvent.childUuids); var eventUuids = d3.set(provenanceEvent.childUuids);
@ -1371,18 +1331,17 @@ nf.ng.ProvenanceLineage = function () {
$('#lineage-query-dialog').modal('hide'); $('#lineage-query-dialog').modal('hide');
}; };
// polls the server for the status of the lineage, if the lineage is not // polls the server for the status of the lineage
// done wait nextDelay seconds before trying again var pollLineage = function (provenanceTableCtrl) {
var pollLineage = function (nextDelay, provenanceTableCtrl) {
getLineage(lineage).done(function (response) { getLineage(lineage).done(function (response) {
lineage = response.lineage; lineage = response.lineage;
// process the lineage, if its not done computing wait delay seconds before checking again // process the lineage
processLineage(nextDelay, provenanceTableCtrl); processLineage(provenanceTableCtrl);
}).fail(closeDialog); }).fail(closeDialog);
}; };
var processLineage = function (delay, provenanceTableCtrl) { var processLineage = function (provenanceTableCtrl) {
// if the request was cancelled just ignore the current response // if the request was cancelled just ignore the current response
if (cancelled === true) { if (cancelled === true) {
closeDialog(); closeDialog();
@ -1417,13 +1376,9 @@ nf.ng.ProvenanceLineage = function () {
// clear the timer since we've been invoked // clear the timer since we've been invoked
lineageTimer = null; lineageTimer = null;
// calculate the next delay (back off)
var backoff = delay * 2;
var nextDelay = backoff > provenanceTableCtrl.MAX_DELAY ? provenanceTableCtrl.MAX_DELAY : backoff;
// poll lineage // poll lineage
pollLineage(nextDelay, provenanceTableCtrl); pollLineage(provenanceTableCtrl);
}, delay * 1000); }, 2000);
} }
}; };
@ -1432,7 +1387,7 @@ nf.ng.ProvenanceLineage = function () {
lineage = response.lineage; lineage = response.lineage;
// process the results, if they are not done wait 1 second before trying again // process the results, if they are not done wait 1 second before trying again
processLineage(1, provenanceTableCtrl); processLineage(provenanceTableCtrl);
}).fail(closeDialog); }).fail(closeDialog);
} }
} }

View File

@ -34,7 +34,7 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
searchOptions: '../nifi-api/provenance/search-options', searchOptions: '../nifi-api/provenance/search-options',
replays: '../nifi-api/provenance-events/replays', replays: '../nifi-api/provenance-events/replays',
provenance: '../nifi-api/provenance', provenance: '../nifi-api/provenance',
provenanceEvents: '../nifi-api/provenance-events', provenanceEvents: '../nifi-api/provenance-events/',
clusterSearch: '../nifi-api/flow/cluster/search-results', clusterSearch: '../nifi-api/flow/cluster/search-results',
d3Script: 'js/d3/d3.min.js', d3Script: 'js/d3/d3.min.js',
lineageScript: 'js/nf/provenance/nf-provenance-lineage.js', lineageScript: 'js/nf/provenance/nf-provenance-lineage.js',
@ -57,7 +57,7 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
var eventId = $('#provenance-event-id').text(); var eventId = $('#provenance-event-id').text();
// build the url // build the url
var dataUri = config.urls.provenanceEvents + '/' + encodeURIComponent(eventId) + '/content/' + encodeURIComponent(direction); var dataUri = config.urls.provenanceEvents + encodeURIComponent(eventId) + '/content/' + encodeURIComponent(direction);
// perform the request once we've received a token // perform the request once we've received a token
nf.Common.getAccessToken(config.urls.downloadToken).done(function (downloadToken) { nf.Common.getAccessToken(config.urls.downloadToken).done(function (downloadToken) {
@ -736,7 +736,7 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
} }
} else if (provenanceGrid.getColumns()[args.cell].id === 'moreDetails') { } else if (provenanceGrid.getColumns()[args.cell].id === 'moreDetails') {
if (target.hasClass('show-event-details')) { if (target.hasClass('show-event-details')) {
provenanceTableCtrl.showEventDetails(item); provenanceTableCtrl.showEventDetails(item.eventId, item.clusterNodeId);
} }
} }
}); });
@ -864,7 +864,9 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
var provenanceEntity = { var provenanceEntity = {
'provenance': { 'provenance': {
'request': $.extend({ 'request': $.extend({
maxResults: config.maxResults maxResults: config.maxResults,
summarize: true,
incrementalResults: false
}, provenance) }, provenance)
} }
}; };
@ -889,7 +891,14 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
var url = provenance.uri; var url = provenance.uri;
if (nf.Common.isDefinedAndNotNull(provenance.request.clusterNodeId)) { if (nf.Common.isDefinedAndNotNull(provenance.request.clusterNodeId)) {
url += '?' + $.param({ url += '?' + $.param({
clusterNodeId: provenance.request.clusterNodeId clusterNodeId: provenance.request.clusterNodeId,
summarize: true,
incrementalResults: false
});
} else {
url += '?' + $.param({
summarize: true,
incrementalResults: false
}); });
} }
@ -1003,11 +1012,6 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
function ProvenanceTableCtrl() { function ProvenanceTableCtrl() {
/**
* The max delay between requests.
*/
this.MAX_DELAY = 4;
/** /**
* The server time offset * The server time offset
*/ */
@ -1147,21 +1151,19 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
$('#provenance-query-dialog').modal('hide'); $('#provenance-query-dialog').modal('hide');
}; };
// polls the server for the status of the provenance, if the provenance is not // polls the server for the status of the provenance
// done wait nextDelay seconds before trying again var pollProvenance = function () {
var pollProvenance = function (nextDelay) {
getProvenance(provenance).done(function (response) { getProvenance(provenance).done(function (response) {
// update the provenance // update the provenance
provenance = response.provenance; provenance = response.provenance;
// process the provenance // process the provenance
processProvenanceResponse(nextDelay); processProvenanceResponse();
}).fail(closeDialog); }).fail(closeDialog);
}; };
// processes the provenance, if the provenance is not done wait delay // processes the provenance
// before polling again var processProvenanceResponse = function () {
var processProvenanceResponse = function (delay) {
// if the request was cancelled just ignore the current response // if the request was cancelled just ignore the current response
if (cancelled === true) { if (cancelled === true) {
closeDialog(); closeDialog();
@ -1193,13 +1195,9 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
// clear the timer since we've been invoked // clear the timer since we've been invoked
provenanceTimer = null; provenanceTimer = null;
// calculate the next delay (back off)
var backoff = delay * 2;
var nextDelay = backoff > self.MAX_DELAY ? self.MAX_DELAY : backoff;
// poll provenance // poll provenance
pollProvenance(nextDelay); pollProvenance();
}, delay * 1000); }, 2000);
} }
}; };
@ -1209,223 +1207,251 @@ nf.ng.ProvenanceTable = function (provenanceLineageCtrl) {
provenance = response.provenance; provenance = response.provenance;
// process the results, if they are not done wait 1 second before trying again // process the results, if they are not done wait 1 second before trying again
processProvenanceResponse(1); processProvenanceResponse();
}).fail(closeDialog); }).fail(closeDialog);
}, },
/**
* Gets the details for the specified event.
*
* @param {string} eventId
* @param {string} clusterNodeId The id of the node in the cluster where this event/flowfile originated
*/
getEventDetails: function (eventId, clusterNodeId) {
var url;
if (nf.Common.isDefinedAndNotNull(clusterNodeId)) {
url = config.urls.provenanceEvents + encodeURIComponent(eventId) + '?' + $.param({
clusterNodeId: clusterNodeId
});
} else {
url = config.urls.provenanceEvents + encodeURIComponent(eventId);
}
return $.ajax({
type: 'GET',
url: url,
dataType: 'json'
}).fail(nf.Common.handleAjaxError);
},
/** /**
* Shows the details for the specified action. * Shows the details for the specified action.
* *
* @param {object} event * @param {string} eventId
* @param {string} clusterNodeId The id of the node in the cluster where this event/flowfile originated
*/ */
showEventDetails: function (event) { showEventDetails: function (eventId, clusterNodeId) {
// update the event details provenanceTableCtrl.getEventDetails(eventId, clusterNodeId).done(function (response) {
$('#provenance-event-id').text(event.eventId); var event = response.provenanceEvent;
$('#provenance-event-time').html(nf.Common.formatValue(event.eventTime)).ellipsis();
$('#provenance-event-type').html(nf.Common.formatValue(event.eventType)).ellipsis();
$('#provenance-event-flowfile-uuid').html(nf.Common.formatValue(event.flowFileUuid)).ellipsis();
$('#provenance-event-component-id').html(nf.Common.formatValue(event.componentId)).ellipsis();
$('#provenance-event-component-name').html(nf.Common.formatValue(event.componentName)).ellipsis();
$('#provenance-event-component-type').html(nf.Common.formatValue(event.componentType)).ellipsis();
$('#provenance-event-details').html(nf.Common.formatValue(event.details)).ellipsis();
// over the default tooltip with the actual byte count // update the event details
var fileSize = $('#provenance-event-file-size').html(nf.Common.formatValue(event.fileSize)).ellipsis(); $('#provenance-event-id').text(event.eventId);
fileSize.attr('title', nf.Common.formatInteger(event.fileSizeBytes) + ' bytes'); $('#provenance-event-time').html(nf.Common.formatValue(event.eventTime)).ellipsis();
$('#provenance-event-type').html(nf.Common.formatValue(event.eventType)).ellipsis();
$('#provenance-event-flowfile-uuid').html(nf.Common.formatValue(event.flowFileUuid)).ellipsis();
$('#provenance-event-component-id').html(nf.Common.formatValue(event.componentId)).ellipsis();
$('#provenance-event-component-name').html(nf.Common.formatValue(event.componentName)).ellipsis();
$('#provenance-event-component-type').html(nf.Common.formatValue(event.componentType)).ellipsis();
$('#provenance-event-details').html(nf.Common.formatValue(event.details)).ellipsis();
// sets an duration // over the default tooltip with the actual byte count
var setDuration = function (field, value) { var fileSize = $('#provenance-event-file-size').html(nf.Common.formatValue(event.fileSize)).ellipsis();
if (nf.Common.isDefinedAndNotNull(value)) { fileSize.attr('title', nf.Common.formatInteger(event.fileSizeBytes) + ' bytes');
if (value === 0) {
field.text('< 1ms'); // sets an duration
var setDuration = function (field, value) {
if (nf.Common.isDefinedAndNotNull(value)) {
if (value === 0) {
field.text('< 1ms');
} else {
field.text(nf.Common.formatDuration(value));
}
} else { } else {
field.text(nf.Common.formatDuration(value)); field.html('<span class="unset">No value set</span>');
}
};
// handle durations
setDuration($('#provenance-event-duration'), event.eventDuration);
setDuration($('#provenance-lineage-duration'), event.lineageDuration);
// formats an event detail
var formatEventDetail = function (label, value) {
$('<div class="event-detail"></div>').append(
$('<div class="detail-name"></div>').text(label)).append(
$('<div class="detail-value">' + nf.Common.formatValue(value) + '</div>').ellipsis()).append(
$('<div class="clear"></div>')).appendTo('#additional-provenance-details');
};
// conditionally show RECEIVE details
if (event.eventType === 'RECEIVE') {
formatEventDetail('Source FlowFile Id', event.sourceSystemFlowFileId);
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show SEND details
if (event.eventType === 'SEND') {
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show ADDINFO details
if (event.eventType === 'ADDINFO') {
formatEventDetail('Alternate Identifier Uri', event.alternateIdentifierUri);
}
// conditionally show ROUTE details
if (event.eventType === 'ROUTE') {
formatEventDetail('Relationship', event.relationship);
}
// conditionally show FETCH details
if (event.eventType === 'FETCH') {
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show the cluster node identifier
if (nf.Common.isDefinedAndNotNull(event.clusterNodeId)) {
// save the cluster node id
$('#provenance-event-cluster-node-id').text(event.clusterNodeId);
// render the cluster node address
formatEventDetail('Node Address', event.clusterNodeAddress);
}
// populate the parent/child flowfile uuids
var parentUuids = $('#parent-flowfiles-container');
var childUuids = $('#child-flowfiles-container');
// handle parent flowfiles
if (nf.Common.isEmpty(event.parentUuids)) {
$('#parent-flowfile-count').text(0);
parentUuids.append('<span class="unset">No parents</span>');
} else {
$('#parent-flowfile-count').text(event.parentUuids.length);
$.each(event.parentUuids, function (_, uuid) {
$('<div></div>').text(uuid).appendTo(parentUuids);
});
}
// handle child flowfiles
if (nf.Common.isEmpty(event.childUuids)) {
$('#child-flowfile-count').text(0);
childUuids.append('<span class="unset">No children</span>');
} else {
$('#child-flowfile-count').text(event.childUuids.length);
$.each(event.childUuids, function (_, uuid) {
$('<div></div>').text(uuid).appendTo(childUuids);
});
}
// get the attributes container
var attributesContainer = $('#attributes-container');
// get any action details
$.each(event.attributes, function (_, attribute) {
// create the attribute record
var attributeRecord = $('<div class="attribute-detail"></div>')
.append($('<div class="attribute-name">' + nf.Common.formatValue(attribute.name) + '</div>').ellipsis())
.appendTo(attributesContainer);
// add the current value
attributeRecord
.append($('<div class="attribute-value">' + nf.Common.formatValue(attribute.value) + '</div>').ellipsis())
.append('<div class="clear"></div>');
// show the previous value if the property has changed
if (attribute.value !== attribute.previousValue) {
if (nf.Common.isDefinedAndNotNull(attribute.previousValue)) {
attributeRecord
.append($('<div class="modified-attribute-value">' + nf.Common.formatValue(attribute.previousValue) + '<span class="unset"> (previous)</span></div>').ellipsis())
.append('<div class="clear"></div>');
} else {
attributeRecord
.append($('<div class="unset" style="font-size: 13px; padding-top: 2px;">' + nf.Common.formatValue(attribute.previousValue) + '</div>').ellipsis())
.append('<div class="clear"></div>');
}
} else {
// mark this attribute as not modified
attributeRecord.addClass('attribute-unmodified');
}
});
var formatContentValue = function (element, value) {
if (nf.Common.isDefinedAndNotNull(value)) {
element.removeClass('unset').text(value);
} else {
element.addClass('unset').text('No value previously set');
}
};
// content
$('#input-content-header').text('Input Claim');
formatContentValue($('#input-content-container'), event.inputContentClaimContainer);
formatContentValue($('#input-content-section'), event.inputContentClaimSection);
formatContentValue($('#input-content-identifier'), event.inputContentClaimIdentifier);
formatContentValue($('#input-content-offset'), event.inputContentClaimOffset);
formatContentValue($('#input-content-bytes'), event.inputContentClaimFileSizeBytes);
// input content file size
var inputContentSize = $('#input-content-size');
formatContentValue(inputContentSize, event.inputContentClaimFileSize);
if (nf.Common.isDefinedAndNotNull(event.inputContentClaimFileSize)) {
// over the default tooltip with the actual byte count
inputContentSize.attr('title', nf.Common.formatInteger(event.inputContentClaimFileSizeBytes) + ' bytes');
}
formatContentValue($('#output-content-container'), event.outputContentClaimContainer);
formatContentValue($('#output-content-section'), event.outputContentClaimSection);
formatContentValue($('#output-content-identifier'), event.outputContentClaimIdentifier);
formatContentValue($('#output-content-offset'), event.outputContentClaimOffset);
formatContentValue($('#output-content-bytes'), event.outputContentClaimFileSizeBytes);
// output content file size
var outputContentSize = $('#output-content-size');
formatContentValue(outputContentSize, event.outputContentClaimFileSize);
if (nf.Common.isDefinedAndNotNull(event.outputContentClaimFileSize)) {
// over the default tooltip with the actual byte count
outputContentSize.attr('title', nf.Common.formatInteger(event.outputContentClaimFileSizeBytes) + ' bytes');
}
if (event.inputContentAvailable === true) {
$('#input-content-download').show();
if (nf.Common.isContentViewConfigured()) {
$('#input-content-view').show();
} else {
$('#input-content-view').hide();
} }
} else { } else {
field.html('<span class="unset">No value set</span>'); $('#input-content-download').hide();
}
};
// handle durations
setDuration($('#provenance-event-duration'), event.eventDuration);
setDuration($('#provenance-lineage-duration'), event.lineageDuration);
// formats an event detail
var formatEventDetail = function (label, value) {
$('<div class="event-detail"></div>').append(
$('<div class="detail-name"></div>').text(label)).append(
$('<div class="detail-value">' + nf.Common.formatValue(value) + '</div>').ellipsis()).append(
$('<div class="clear"></div>')).appendTo('#additional-provenance-details');
};
// conditionally show RECEIVE details
if (event.eventType === 'RECEIVE') {
formatEventDetail('Source FlowFile Id', event.sourceSystemFlowFileId);
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show SEND details
if (event.eventType === 'SEND') {
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show ADDINFO details
if (event.eventType === 'ADDINFO') {
formatEventDetail('Alternate Identifier Uri', event.alternateIdentifierUri);
}
// conditionally show ROUTE details
if (event.eventType === 'ROUTE') {
formatEventDetail('Relationship', event.relationship);
}
// conditionally show FETCH details
if (event.eventType === 'FETCH') {
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show the cluster node identifier
if (nf.Common.isDefinedAndNotNull(event.clusterNodeId)) {
// save the cluster node id
$('#provenance-event-cluster-node-id').text(event.clusterNodeId);
// render the cluster node address
formatEventDetail('Node Address', event.clusterNodeAddress);
}
// populate the parent/child flowfile uuids
var parentUuids = $('#parent-flowfiles-container');
var childUuids = $('#child-flowfiles-container');
// handle parent flowfiles
if (nf.Common.isEmpty(event.parentUuids)) {
$('#parent-flowfile-count').text(0);
parentUuids.append('<span class="unset">No parents</span>');
} else {
$('#parent-flowfile-count').text(event.parentUuids.length);
$.each(event.parentUuids, function (_, uuid) {
$('<div></div>').text(uuid).appendTo(parentUuids);
});
}
// handle child flowfiles
if (nf.Common.isEmpty(event.childUuids)) {
$('#child-flowfile-count').text(0);
childUuids.append('<span class="unset">No children</span>');
} else {
$('#child-flowfile-count').text(event.childUuids.length);
$.each(event.childUuids, function (_, uuid) {
$('<div></div>').text(uuid).appendTo(childUuids);
});
}
// get the attributes container
var attributesContainer = $('#attributes-container');
// get any action details
$.each(event.attributes, function (_, attribute) {
// create the attribute record
var attributeRecord = $('<div class="attribute-detail"></div>')
.append($('<div class="attribute-name">' + nf.Common.formatValue(attribute.name) + '</div>').ellipsis())
.appendTo(attributesContainer);
// add the current value
attributeRecord
.append($('<div class="attribute-value">' + nf.Common.formatValue(attribute.value) + '</div>').ellipsis())
.append('<div class="clear"></div>');
// show the previous value if the property has changed
if (attribute.value !== attribute.previousValue) {
if (nf.Common.isDefinedAndNotNull(attribute.previousValue)) {
attributeRecord
.append($('<div class="modified-attribute-value">' + nf.Common.formatValue(attribute.previousValue) + '<span class="unset"> (previous)</span></div>').ellipsis())
.append('<div class="clear"></div>');
} else {
attributeRecord
.append($('<div class="unset" style="font-size: 13px; padding-top: 2px;">' + nf.Common.formatValue(attribute.previousValue) + '</div>').ellipsis())
.append('<div class="clear"></div>');
}
} else {
// mark this attribute as not modified
attributeRecord.addClass('attribute-unmodified');
}
});
var formatContentValue = function (element, value) {
if (nf.Common.isDefinedAndNotNull(value)) {
element.removeClass('unset').text(value);
} else {
element.addClass('unset').text('No value previously set');
}
};
// content
$('#input-content-header').text('Input Claim');
formatContentValue($('#input-content-container'), event.inputContentClaimContainer);
formatContentValue($('#input-content-section'), event.inputContentClaimSection);
formatContentValue($('#input-content-identifier'), event.inputContentClaimIdentifier);
formatContentValue($('#input-content-offset'), event.inputContentClaimOffset);
formatContentValue($('#input-content-bytes'), event.inputContentClaimFileSizeBytes);
// input content file size
var inputContentSize = $('#input-content-size');
formatContentValue(inputContentSize, event.inputContentClaimFileSize);
if (nf.Common.isDefinedAndNotNull(event.inputContentClaimFileSize)) {
// over the default tooltip with the actual byte count
inputContentSize.attr('title', nf.Common.formatInteger(event.inputContentClaimFileSizeBytes) + ' bytes');
}
formatContentValue($('#output-content-container'), event.outputContentClaimContainer);
formatContentValue($('#output-content-section'), event.outputContentClaimSection);
formatContentValue($('#output-content-identifier'), event.outputContentClaimIdentifier);
formatContentValue($('#output-content-offset'), event.outputContentClaimOffset);
formatContentValue($('#output-content-bytes'), event.outputContentClaimFileSizeBytes);
// output content file size
var outputContentSize = $('#output-content-size');
formatContentValue(outputContentSize, event.outputContentClaimFileSize);
if (nf.Common.isDefinedAndNotNull(event.outputContentClaimFileSize)) {
// over the default tooltip with the actual byte count
outputContentSize.attr('title', nf.Common.formatInteger(event.outputContentClaimFileSizeBytes) + ' bytes');
}
if (event.inputContentAvailable === true) {
$('#input-content-download').show();
if (nf.Common.isContentViewConfigured()) {
$('#input-content-view').show();
} else {
$('#input-content-view').hide(); $('#input-content-view').hide();
} }
} else {
$('#input-content-download').hide();
$('#input-content-view').hide();
}
if (event.outputContentAvailable === true) { if (event.outputContentAvailable === true) {
$('#output-content-download').show(); $('#output-content-download').show();
if (nf.Common.isContentViewConfigured()) { if (nf.Common.isContentViewConfigured()) {
$('#output-content-view').show(); $('#output-content-view').show();
} else {
$('#output-content-view').hide();
}
} else { } else {
$('#output-content-download').hide();
$('#output-content-view').hide(); $('#output-content-view').hide();
} }
} else {
$('#output-content-download').hide();
$('#output-content-view').hide();
}
if (event.replayAvailable === true) { if (event.replayAvailable === true) {
$('#replay-content, #replay-content-connection').show(); $('#replay-content, #replay-content-connection').show();
formatContentValue($('#replay-connection-id'), event.sourceConnectionIdentifier); formatContentValue($('#replay-connection-id'), event.sourceConnectionIdentifier);
$('#replay-content-message').hide(); $('#replay-content-message').hide();
} else { } else {
$('#replay-content, #replay-content-connection').hide(); $('#replay-content, #replay-content-connection').hide();
$('#replay-content-message').text(event.replayExplanation).show(); $('#replay-content-message').text(event.replayExplanation).show();
} }
// show the dialog // show the dialog
$('#event-details-dialog').modal('show'); $('#event-details-dialog').modal('show');
});
} }
} }

View File

@ -83,7 +83,7 @@ nf.ng.Provenance = function (provenanceTableCtrl) {
*/ */
var loadAbout = function () { var loadAbout = function () {
// get the about details // get the about details
$.ajax({ return $.ajax({
type: 'GET', type: 'GET',
url: config.urls.about, url: config.urls.about,
dataType: 'json' dataType: 'json'