NIFI-7738 Reverse Provenance Query

This closes #4563.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Nissim Shiman 2020-09-15 17:00:24 +00:00 committed by Mark Payne
parent 382439c1d0
commit 3cc8d767b3
19 changed files with 386 additions and 85 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

After

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 30 KiB

After

Width:  |  Height:  |  Size: 44 KiB

View File

@ -2742,11 +2742,14 @@ image:search-events.png["Search Events"]
For example, to determine if a particular FlowFile was received, search for an Event Type of "RECEIVE" and include an
identifier for the FlowFile, such as its uuid or filename. The asterisk (*) may be used as a wildcard for any number of characters.
So, to determine whether a FlowFile with "ABC" anywhere in its filename was received at any time on Jan. 6, 2015, the search shown in the following
So, to determine whether a FlowFile with "ABC" anywhere in its filename was received at any time on July 29, 2016, the search shown in the following
image could be performed:
image:search-receive-event-abc.png["Search for RECEIVE Event"]
If all filenames that do not have "ABC" anywhere in the filename is desired, then click the checkbox with the label "Exclude from search results" beneath
this entry before performing the search.
[[event_details]]
=== Details of an Event
In the far-left column of the Data Provenance page, there is a "View Details" icon for each event (image:iconDetails.png["Details"]).

View File

@ -21,4 +21,6 @@ public interface SearchTerm {
SearchableField getSearchableField();
String getValue();
Boolean isInverted();
}

View File

@ -18,7 +18,7 @@ package org.apache.nifi.provenance.search;
public class SearchTerms {
public static SearchTerm newSearchTerm(final SearchableField field, final String value) {
public static SearchTerm newSearchTerm(final SearchableField field, final String value, final Boolean inverted) {
return new SearchTerm() {
@Override
public SearchableField getSearchableField() {
@ -34,6 +34,14 @@ public class SearchTerms {
public String toString() {
return getValue();
}
@Override
public Boolean isInverted() {
if (inverted == null) {
return Boolean.FALSE;
}
return inverted;
}
};
}
}

View File

@ -30,7 +30,7 @@ import java.util.Map;
@XmlType(name = "provenanceRequest")
public class ProvenanceRequestDTO {
private Map<String, String> searchTerms;
private Map<String, ProvenanceSearchValueDTO> searchTerms;
private String clusterNodeId;
private Date startDate;
private Date endDate;
@ -47,11 +47,11 @@ public class ProvenanceRequestDTO {
@ApiModelProperty(
value = "The search terms used to perform the search."
)
public Map<String, String> getSearchTerms() {
public Map<String, ProvenanceSearchValueDTO> getSearchTerms() {
return searchTerms;
}
public void setSearchTerms(final Map<String, String> searchTerms) {
public void setSearchTerms(final Map<String, ProvenanceSearchValueDTO> searchTerms) {
this.searchTerms = searchTerms;
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.provenance;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlType;
/**
* Provenance value used to query
*/
@XmlType(name = "provenanceSearchValue")
public class ProvenanceSearchValueDTO {
private String value;
private Boolean inverse;
/**
* @return the search value
*/
@ApiModelProperty(
value = "The search value."
)
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
/**
* @return whether inverse of search value should be searched or not
*/
@ApiModelProperty(
value = "Query for all except for search value."
)
public Boolean getInverse() {
return inverse;
}
public void setInverse(Boolean inverse) {
this.inverse = inverse;
}
}

View File

@ -102,6 +102,7 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceSearchableFieldDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceSearchValueDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageRequestType;
@ -1037,16 +1038,16 @@ public class ControllerFacade implements Authorizable {
// if the request was specified
if (requestDto != null) {
// add each search term specified
final Map<String, String> searchTerms = requestDto.getSearchTerms();
final Map<String, ProvenanceSearchValueDTO> searchTerms = requestDto.getSearchTerms();
if (searchTerms != null) {
for (final Map.Entry<String, String> searchTerm : searchTerms.entrySet()) {
for (final Map.Entry<String, ProvenanceSearchValueDTO> searchTerm : searchTerms.entrySet()) {
SearchableField field;
field = SearchableFields.getSearchableField(searchTerm.getKey());
if (field == null) {
field = SearchableFields.newSearchableAttribute(searchTerm.getKey());
}
query.addSearchTerm(SearchTerms.newSearchTerm(field, searchTerm.getValue()));
query.addSearchTerm(SearchTerms.newSearchTerm(field, searchTerm.getValue().getValue(), searchTerm.getValue().getInverse()));
}
}
@ -1113,9 +1114,12 @@ public class ControllerFacade implements Authorizable {
requestDto.setMaximumFileSize(query.getMaxFileSize());
requestDto.setMaxResults(query.getMaxResults());
if (query.getSearchTerms() != null) {
final Map<String, String> searchTerms = new HashMap<>();
final Map<String, ProvenanceSearchValueDTO> searchTerms = new HashMap<>();
for (final SearchTerm searchTerm : query.getSearchTerms()) {
searchTerms.put(searchTerm.getSearchableField().getFriendlyName(), searchTerm.getValue());
final ProvenanceSearchValueDTO searchValueDTO = new ProvenanceSearchValueDTO();
searchValueDTO.setValue(searchTerm.getValue());
searchValueDTO.setInverse(searchTerm.isInverted());
searchTerms.put(searchTerm.getSearchableField().getFriendlyName(), searchValueDTO);
}
requestDto.setSearchTerms(searchTerms);
}

View File

@ -18,7 +18,6 @@
<div id="provenance-search-dialog" class="hidden medium-dialog">
<div class="dialog-content">
<div class="setting">
<div class="setting-name">Fields</div>
<div class="setting-field">
<div id="searchable-fields-container">
<div id="no-searchable-fields" class="unset">No searchable fields have been configured.</div>

View File

@ -129,18 +129,31 @@ div.provenance-panel {
}
div.searchable-field {
margin-bottom: 5px;
margin-bottom: 15px;
}
div.searchable-field-name {
width: 25%;
float: left;
line-height: 26px;
}
div.searchable-field-value {
float: left;
width: 75%;
div.searchable-checkbox-value {
margin-top: 3px;
}
div.searchable-checkbox-label {
float: left;
margin-top: 2px;
font-size: 12px;
font-family: Roboto;
font-weight: 500;
text-transform: none;
color: #775351;
}
div.searchable-checkbox-tooltip {
float: left;
margin-top: 2px;
margin-bottom: 11px;
}
.start-date-setting, .start-time-setting, .end-date-setting, .end-time-setting {

View File

@ -508,8 +508,11 @@
var appendSearchableField = function (field) {
var searchableField = $('<div class="searchable-field"></div>').appendTo('#searchable-fields-container');
$('<span class="searchable-field-id hidden"></span>').text(field.id).appendTo(searchableField);
$('<div class="searchable-field-name"></div>').text(field.label).appendTo(searchableField);
$('<div class="searchable-field-name setting-name"></div>').text(field.label).appendTo(searchableField);
$('<div class="searchable-field-value"><input type="text" class="searchable-field-input"/></div>').appendTo(searchableField);
$('<div class="searchable-checkbox-value nf-checkbox checkbox-unchecked"></div>').appendTo(searchableField);
$('<div class="searchable-checkbox-label nf-checkbox-label">Exclude from search results</div>').appendTo(searchableField);
$('<div class="searchable-checkbox-tooltip fa fa-question-circle" title="Query for all values except what is entered."></div>').appendTo(searchableField);
$('<div class="clear"></div>').appendTo(searchableField);
// make the searchable accessible for populating
@ -532,10 +535,20 @@
var searchableField = $(this);
var fieldId = searchableField.children('span.searchable-field-id').text();
var searchValue = $.trim(searchableField.find('input.searchable-field-input').val());
var searchDetails = {};
// if the field isn't blank include it in the search
if (!nfCommon.isBlank(searchValue)) {
searchCriteria[fieldId] = searchValue;
searchCriteria[fieldId] = searchDetails;
searchDetails["value"] = searchValue;
var inverse = "inverse";
var searchInverse = searchableField.find('div.searchable-checkbox-value').hasClass('checkbox-checked');
if (searchInverse == true)
{
searchDetails[inverse] = true;
} else {
searchDetails[inverse] = false;
}
}
});
return searchCriteria;

View File

@ -265,7 +265,11 @@
$('input.searchable-component-id').val(initialComponentId);
// build the search criteria
searchTerms['ProcessorID'] = initialComponentId;
var provenanceSearchValue = {};
provenanceSearchValue['value'] = initialComponentId;
provenanceSearchValue['inverse'] = false;
searchTerms['ProcessorID'] = provenanceSearchValue;
}
// look for a flowfile uuid in the query search
@ -275,7 +279,11 @@
$('input.searchable-flowfile-uuid').val(initialFlowFileUuid);
// build the search criteria
searchTerms['FlowFileUUID'] = initialFlowFileUuid;
var provenanceSearchValue = {};
provenanceSearchValue['value'] = initialFlowFileUuid;
provenanceSearchValue['inverse'] = false;
searchTerms['FlowFileUUID'] = provenanceSearchValue;
}
// load the provenance table

View File

@ -98,16 +98,28 @@ public class LuceneUtil {
}
final BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder();
// there needs to always be an Occur.MUST (or Occur.SHOULD) in every apache lucene Boolean query
// See https://lucidworks.com/post/why-not-and-or-and-not/
// so we need to keep track of this until it is used
boolean occurMust = false;
for (final SearchTerm searchTerm : query.getSearchTerms()) {
final String searchValue = searchTerm.getValue();
if (searchValue == null) {
throw new IllegalArgumentException("Empty search value not allowed (for term '" + searchTerm.getSearchableField().getFriendlyName() + "')");
}
Occur occur = searchTerm.isInverted().booleanValue() ? BooleanClause.Occur.MUST_NOT : BooleanClause.Occur.MUST;
if (occur.equals(BooleanClause.Occur.MUST)) {
occurMust = true;
}
if (searchValue.contains("*") || searchValue.contains("?")) {
queryBuilder.add(new BooleanClause(new WildcardQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST));
queryBuilder.add(new BooleanClause(new WildcardQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), occur));
} else {
queryBuilder.add(new BooleanClause(new TermQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), Occur.MUST));
queryBuilder.add(new BooleanClause(new TermQuery(new Term(searchTerm.getSearchableField().getSearchableFieldName(), searchTerm.getValue().toLowerCase())), occur));
}
}
@ -115,12 +127,18 @@ public class LuceneUtil {
final long minBytes = query.getMinFileSize() == null ? 0L : DataUnit.parseDataSize(query.getMinFileSize(), DataUnit.B).longValue();
final long maxBytes = query.getMaxFileSize() == null ? Long.MAX_VALUE : DataUnit.parseDataSize(query.getMaxFileSize(), DataUnit.B).longValue();
queryBuilder.add(LongPoint.newRangeQuery(SearchableFields.FileSize.getSearchableFieldName(), minBytes, maxBytes), Occur.MUST);
occurMust = true;
}
if (query.getStartDate() != null || query.getEndDate() != null) {
final long minDateTime = query.getStartDate() == null ? 0L : query.getStartDate().getTime();
final long maxDateTime = query.getEndDate() == null ? Long.MAX_VALUE : query.getEndDate().getTime();
queryBuilder.add(LongPoint.newRangeQuery(SearchableFields.EventTime.getSearchableFieldName(), minDateTime, maxDateTime), Occur.MUST);
occurMust = true;
}
if (!occurMust) {
queryBuilder.add(new MatchAllDocsQuery(), Occur.SHOULD);
}
return queryBuilder.build();

View File

@ -446,7 +446,7 @@ public class ITestPersistentProvenanceRepository {
repo.waitForRollover();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "XXXX"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "XXXX", null));
query.setMaxResults(100);
final QueryResult result = repo.queryEvents(query, createUser());
@ -495,7 +495,7 @@ public class ITestPersistentProvenanceRepository {
repo.waitForRollover();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("immense"), "000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("immense"), "000*", null));
query.setMaxResults(100);
final QueryResult result = repo.queryEvents(query, createUser());
@ -534,10 +534,10 @@ public class ITestPersistentProvenanceRepository {
repo.waitForRollover();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "000000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "000000*", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
query.setMaxResults(100);
final QueryResult result = repo.queryEvents(query, createUser());
@ -620,9 +620,9 @@ public class ITestPersistentProvenanceRepository {
final Query query = new Query(UUID.randomUUID().toString());
// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
query.setMaxResults(100);
final QueryResult result = repo.queryEvents(query, createUser());
@ -689,9 +689,9 @@ public class ITestPersistentProvenanceRepository {
}
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
query.setMaxResults(100);
final QuerySubmission submission = repo.submitQuery(query, createUser());
@ -1065,7 +1065,7 @@ public class ITestPersistentProvenanceRepository {
File eventFile = this.prepCorruptedEventFileTests();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*", null));
query.setMaxResults(100);
DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
@ -1087,7 +1087,7 @@ public class ITestPersistentProvenanceRepository {
File eventFile = this.prepCorruptedEventFileTests();
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*", null));
query.setMaxResults(100);
DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
in.close();
@ -1180,9 +1180,9 @@ public class ITestPersistentProvenanceRepository {
// verify we get the results expected
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
query.setMaxResults(100);
final QueryResult result = repo.queryEvents(query, createUser());
@ -1355,7 +1355,7 @@ public class ITestPersistentProvenanceRepository {
repo.waitForRollover();
final Query query = new Query("1234");
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234", null));
final QuerySubmission submission = repo.submitQuery(query, createUser());
final QueryResult result = submission.getResult();

View File

@ -35,6 +35,7 @@ import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.ArrayListEventStore;
import org.apache.nifi.provenance.store.EventStore;
@ -507,7 +508,7 @@ public class TestLuceneEventIndex {
// Create a query that searches for the event with the FlowFile UUID equal to the first event's.
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, event.getFlowFileUuid()));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, event.getFlowFileUuid(), null));
final ArrayListEventStore eventStore = new ArrayListEventStore();
eventStore.addEvent(event);
@ -536,6 +537,59 @@ public class TestLuceneEventIndex {
assertEquals(event, matchingEvents.get(0));
}
@Test(timeout = 5000)
public void testQueryInverseSpecificField() throws InterruptedException, IOException {
final List<SearchableField> searchableFields = new ArrayList<>();
searchableFields.add(SearchableFields.ComponentID);
searchableFields.add(SearchableFields.FlowFileUUID);
final RepositoryConfiguration repoConfig = createConfig();
repoConfig.setSearchableFields(searchableFields);
final IndexManager indexManager = new StandardIndexManager(repoConfig);
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 2, EventReporter.NO_OP);
final ProvenanceEventRecord event1 = createEvent(System.currentTimeMillis(), "11111111-1111-1111-1111-111111111111", "component-1");
final ProvenanceEventRecord event2 = createEvent(System.currentTimeMillis(), "22222222-2222-2222-2222-222222222222", "component-2");
// add 2 events, one of which we *will not* query for.
index.addEvent(event1, new StorageSummary(event1.getEventId(), "1.prov", "1", 1, 2L, 2L));
index.addEvent(event2, new StorageSummary(event2.getEventId(), "1.prov", "1", 1, 2L, 2L));
// Create a query that searches for the event with the FlowFile UUID that is NOT equal to the first event's
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, event1.getFlowFileUuid(), Boolean.TRUE));
final ArrayListEventStore eventStore = new ArrayListEventStore();
eventStore.addEvent(event1);
eventStore.addEvent(event2);
index.initialize(eventStore);
// We don't know how long it will take for the event to be indexed, so keep querying until
// we get a result. The test will timeout after 5 seconds if we've still not succeeded.
List<ProvenanceEventRecord> matchingEvents = Collections.emptyList();
while (matchingEvents.isEmpty()) {
final QuerySubmission submission = index.submitQuery(query, EventAuthorizer.GRANT_ALL, "unit test user");
assertNotNull(submission);
final QueryResult result = submission.getResult();
assertNotNull(result);
result.awaitCompletion(100, TimeUnit.MILLISECONDS);
assertTrue(result.isFinished());
assertNull(result.getError());
matchingEvents = result.getMatchingEvents();
assertNotNull(matchingEvents);
Thread.sleep(100L); // avoid crushing the CPU
}
assertEquals(1, matchingEvents.size());
assertEquals(event2, matchingEvents.get(0));
}
private RepositoryConfiguration createConfig() {
return createConfig(1);
}
@ -572,6 +626,10 @@ public class TestLuceneEventIndex {
}
private ProvenanceEventRecord createEvent(final long timestamp, final String uuid) {
return createEvent(timestamp, uuid, "component-1");
}
private ProvenanceEventRecord createEvent(final long timestamp, final String uuid, final String componentId) {
final Map<String, String> previousAttributes = new HashMap<>();
previousAttributes.put("uuid", uuid);
final Map<String, String> updatedAttributes = new HashMap<>();
@ -580,7 +638,7 @@ public class TestLuceneEventIndex {
final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder()
.setEventType(ProvenanceEventType.CONTENT_MODIFIED)
.setAttributes(previousAttributes, updatedAttributes)
.setComponentId("component-1")
.setComponentId(componentId)
.setComponentType("unit test")
.setEventId(idGenerator.getAndIncrement())
.setEventTime(timestamp)

View File

@ -314,6 +314,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
for (final SearchTerm searchTerm : query.getSearchTerms()) {
final SearchableField searchableField = searchTerm.getSearchableField();
final String searchValue = searchTerm.getValue();
final boolean excludeSearchValue = searchTerm.isInverted().booleanValue();
if (searchableField.isAttribute()) {
final String attributeName = searchableField.getIdentifier();
@ -322,15 +323,22 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
if (searchValue.contains("?") || searchValue.contains("*")) {
if (eventAttributeValue == null || eventAttributeValue.isEmpty()) {
return false;
if (!excludeSearchValue) {
return false;
} else {
continue;
}
}
final String regex = searchValue.replace("?", ".").replace("*", ".*");
final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
if (!pattern.matcher(eventAttributeValue).matches()) {
final boolean patternMatches = pattern.matcher(eventAttributeValue).matches();
if ((!patternMatches && !excludeSearchValue)
|| (patternMatches && excludeSearchValue)) {
return false;
}
} else if (!searchValue.equalsIgnoreCase(eventAttributeValue)) {
} else if (!searchValue.equalsIgnoreCase(eventAttributeValue) && !excludeSearchValue
|| searchValue.equalsIgnoreCase(eventAttributeValue) && excludeSearchValue) {
return false;
}
} else {
@ -339,29 +347,54 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
if (searchValue.contains("?") || searchValue.contains("*")) {
final String regex = searchValue.replace("?", ".").replace("*", ".*");
final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
if (pattern.matcher(event.getFlowFileUuid()).matches()) {
final boolean patternMatches = pattern.matcher(event.getFlowFileUuid()).matches();
if (!excludeSearchValue) {
if (patternMatches) {
continue;
}
boolean found = false;
for (final String uuid : event.getParentUuids()) {
if (pattern.matcher(uuid).matches()) {
found = true;
break;
}
}
for (final String uuid : event.getChildUuids()) {
if (pattern.matcher(uuid).matches()) {
found = true;
break;
}
}
if (found) {
continue;
}
} else {
if (patternMatches) {
return false;
}
for (final String uuid : event.getParentUuids()) {
if (pattern.matcher(uuid).matches()) {
return false;
}
}
for (final String uuid : event.getChildUuids()) {
if (pattern.matcher(uuid).matches()) {
return false;
}
}
continue;
}
boolean found = false;
for (final String uuid : event.getParentUuids()) {
if (pattern.matcher(uuid).matches()) {
found = true;
break;
}
}
for (final String uuid : event.getChildUuids()) {
if (pattern.matcher(uuid).matches()) {
found = true;
break;
}
}
if (found) {
continue;
}
} else if (event.getFlowFileUuid().equals(searchValue) || event.getParentUuids().contains(searchValue) || event.getChildUuids().contains(searchValue)) {
} else if (!excludeSearchValue
&& (event.getFlowFileUuid().equals(searchValue) || event.getParentUuids().contains(searchValue) || event.getChildUuids().contains(searchValue))) {
continue;
} else if (excludeSearchValue
&& (!event.getFlowFileUuid().equals(searchValue) && !event.getParentUuids().contains(searchValue) && !event.getChildUuids().contains(searchValue))){
continue;
}
@ -370,16 +403,24 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
final Object fieldValue = getFieldValue(event, searchableField);
if (fieldValue == null) {
return false;
if (!excludeSearchValue) {
return false;
} else {
continue;
}
}
if (searchValue.contains("?") || searchValue.contains("*")) {
final String regex = searchValue.replace("?", ".").replace("*", ".*");
final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
if (!pattern.matcher(String.valueOf(fieldValue)).matches()) {
final boolean patternMatches = pattern.matcher(String.valueOf(fieldValue)).matches();
if (!patternMatches && !excludeSearchValue
|| patternMatches && excludeSearchValue) {
return false;
}
} else if (!searchValue.equalsIgnoreCase(String.valueOf(fieldValue))) {
} else if (!searchValue.equalsIgnoreCase(String.valueOf(fieldValue)) && !excludeSearchValue
|| searchValue.equalsIgnoreCase(String.valueOf(fieldValue)) && excludeSearchValue) {
return false;
}
}
@ -606,6 +647,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
return maxSize - ringBuffer.getSize();
}
@Override
public String getContainerFileStoreName(String containerName) {
return null;
}

View File

@ -102,10 +102,10 @@ public class TestVolatileProvenanceRepository {
}
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000*", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
query.setMaxResults(100);
final QuerySubmission submission = repo.submitQuery(query, createUser());
@ -119,6 +119,56 @@ public class TestVolatileProvenanceRepository {
}
}
@Test(timeout = 1000)
public void testSearchForInverseValue() throws InterruptedException {
repo = new VolatileProvenanceRepository(NiFiProperties.createBasicNiFiProperties(null));
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
final String uuid_prefix = "00000000-0000-0000-0000-000000000000";
for (int i = 0; i < 2; i++) {
attributes.put("uuid", uuid_prefix + i);
attributes.put("file.owner", "testOwner1");
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
repo.registerEvent(builder.build());
}
for (int i = 2; i < 10; i++) {
attributes.put("uuid", uuid_prefix + i);
attributes.put("file.owner", "testOwner2");
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
repo.registerEvent(builder.build());
}
final Query query = new Query(UUID.randomUUID().toString());
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234", null));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("abc"), "x?z", null));
// set up query to search for event with uuid NOT ending in *000 and file.owner NOT testOwner2 and testAttribute NOT fitting pattern of testAttributeValu?
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "*000", Boolean.TRUE));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("file.owner"), "testOwner2", Boolean.TRUE));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("testAttribute"), "testAttributeValu?", Boolean.TRUE));
query.setMaxResults(100);
final QuerySubmission submission = repo.submitQuery(query, createUser());
while (!submission.getResult().isFinished()) {
Thread.sleep(100L);
}
assertEquals(1, submission.getResult().getMatchingEvents().size());
assertEquals("00000000-0000-0000-0000-0000000000001", submission.getResult().getMatchingEvents().get(0).getFlowFileUuid());
}
private FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
final Map<String, String> attrCopy = new HashMap<>(attributes);

View File

@ -51,6 +51,7 @@ import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceSearchValueDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
@ -870,8 +871,8 @@ public class NiFiClientUtil {
return nifiClient.getOutputPortClient().createOutputPort(groupId, outputPortEntity);
}
public ProvenanceEntity queryProvenance(final Map<SearchableField, String> searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException {
final Map<String, String> searchTermsAsStrings = searchTerms.entrySet().stream()
public ProvenanceEntity queryProvenance(final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException {
final Map<String, ProvenanceSearchValueDTO> searchTermsAsStrings = searchTerms.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getSearchableFieldName(), Map.Entry::getValue));
final ProvenanceRequestDTO requestDto = new ProvenanceRequestDTO();

View File

@ -22,6 +22,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceSearchValueDTO;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.junit.Test;
@ -67,7 +68,11 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
getNifiClient().getProcessorClient().startProcessor(count);
final Map<SearchableField, String> searchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId());
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
searchValueDto.setValue(generateFlowFile.getId());
searchValueDto.setInverse(false);
final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms = Collections.singletonMap(SearchableFields.ComponentID, searchValueDto);
ProvenanceEntity provenanceEntity = getClientUtil().queryProvenance(searchTerms, null, null);
assertEquals(0, provenanceEntity.getProvenance().getResults().getProvenanceEvents().size());
@ -101,7 +106,11 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
final Map<SearchableField, String> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId());
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
searchValueDto.setValue(generateFlowFile.getId());
searchValueDto.setInverse(false);
final Map<SearchableField, ProvenanceSearchValueDTO> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, searchValueDto);
// Wait for there to be at least 1000 events for Generate processor and then stop the processor
waitForEventCountAtLeast(generateSearchTerms, 800);
@ -112,7 +121,12 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
// depending on when the query is executed.
getNifiClient().getProcessorClient().startProcessor(terminate);
final Map<SearchableField, String> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminate.getId());
ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
terminateSearchValueDto.setValue(terminate.getId());
terminateSearchValueDto.setInverse(false);
final Map<SearchableField, ProvenanceSearchValueDTO> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminateSearchValueDto);
waitForEventCountAtLeast(terminateSearchTerms, 600);
waitForEventCountExactly(generateSearchTerms, 0);
@ -142,7 +156,11 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
final Map<SearchableField, String> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId());
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
searchValueDto.setValue(generateFlowFile.getId());
searchValueDto.setInverse(false);
final Map<SearchableField, ProvenanceSearchValueDTO> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, searchValueDto);
// Wait for there to be at least 800 events for Generate processor and then stop it
waitForEventCountAtLeast(generateSearchTerms, 800);
@ -153,7 +171,12 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
// depending on when the query is executed.
getNifiClient().getProcessorClient().startProcessor(terminate);
final Map<SearchableField, String> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminate.getId());
ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
terminateSearchValueDto.setValue(terminate.getId());
terminateSearchValueDto.setInverse(false);
final Map<SearchableField, ProvenanceSearchValueDTO> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminateSearchValueDto);
waitForEventCountAtLeast(terminateSearchTerms, 600);
getNifiClient().getProcessorClient().stopProcessor(terminate);
@ -179,15 +202,15 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
waitForEventCountExactly(generateSearchTerms, 400);
}
private void waitForEventCountExactly(final Map<SearchableField, String> searchTerms, final int expectedCount) throws InterruptedException {
private void waitForEventCountExactly(final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms, final int expectedCount) throws InterruptedException {
waitForEventCount(searchTerms, count -> count == expectedCount);
}
private void waitForEventCountAtLeast(final Map<SearchableField, String> searchTerms, final int expectedCount) throws InterruptedException {
private void waitForEventCountAtLeast(final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms, final int expectedCount) throws InterruptedException {
waitForEventCount(searchTerms, count -> count >= expectedCount);
}
private void waitForEventCount(final Map<SearchableField, String> searchTerms, final Predicate<Integer> predicate) throws InterruptedException {
private void waitForEventCount(final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms, final Predicate<Integer> predicate) throws InterruptedException {
// Wait for there to be at least 1000 events for Generate processor
waitFor(() -> {
try {
@ -198,7 +221,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
}, 500L);
}
private int getEventCount(final Map<SearchableField, String> searchTerms) throws NiFiClientException, IOException {
private int getEventCount(final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms) throws NiFiClientException, IOException {
ProvenanceEntity provEntity = getClientUtil().queryProvenance(searchTerms, null, null);
return provEntity.getProvenance().getResults().getProvenanceEvents().size();
}