NIFI-7375: This closes #4218. Fixed a bug that caused Provenance Events not to show up in specific situations when clicking View Provenance for a Processor.

- Added System-level tests for Provenance repository to reproduce behavior.
- Added a Provenance Client to the CLI, which is necessary for System-level tests.
- Added small additional configuration for Provenance repository to simplify development of system tests
- Minor improvements to system tests (such as ability to destroy environment between tests) needed for Provenance repository based system tests

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2020-04-15 12:24:46 -04:00 committed by Joe Witt
parent 923a07a5db
commit c19db9d623
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
22 changed files with 613 additions and 19 deletions

View File

@ -48,6 +48,7 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult {
private final Lock writeLock = rwLock.writeLock();
// guarded by writeLock
private final SortedSet<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator());
private long hitCount = 0L;
private int numCompletedSteps = 0;
private Date expirationDate;
private String error;
@ -163,6 +164,7 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult {
}
this.matchingRecords.addAll(newEvents);
hitCount += totalHits;
// If we've added more records than the query's max, then remove the trailing elements.
// We do this, rather than avoiding the addition of the elements because we want to choose
@ -188,10 +190,12 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult {
queryComplete = true;
if (numCompletedSteps >= numSteps) {
logger.info("Completed {} comprised of {} steps in {} millis", query, numSteps, queryTime);
logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files.",
query, numSteps, queryTime, hitCount, matchingRecords.size());
} else {
logger.info("Completed {} comprised of {} steps in {} millis (only completed {} steps because the maximum number of results was reached)",
query, numSteps, queryTime, numCompletedSteps);
logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files. "
+ "Only completed {} steps because the maximum number of results was reached.",
query, numSteps, queryTime, hitCount, matchingRecords.size(), numCompletedSteps);
}
}
} finally {

View File

@ -126,6 +126,7 @@ public abstract class NiFiProperties {
public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size";
public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
public static final String PROVENANCE_ROLLOVER_EVENT_COUNT = "nifi.provenance.repository.rollover.events";
public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads";
public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";

View File

@ -39,6 +39,7 @@ public class RepositoryConfiguration {
public static final String CONCURRENT_MERGE_THREADS = "nifi.provenance.repository.concurrent.merge.threads";
public static final String WARM_CACHE_FREQUENCY = "nifi.provenance.repository.warm.cache.frequency";
public static final String MAINTENACE_FREQUENCY = "nifi.provenance.repository.maintenance.frequency";
private final Map<String, File> storageDirectories = new LinkedHashMap<>();
private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
@ -51,6 +52,7 @@ public class RepositoryConfiguration {
private int compressionBlockBytes = 1024 * 1024;
private int maxAttributeChars = 65536;
private int debugFrequency = 1_000_000;
private long maintenanceFrequencyMillis = TimeUnit.MINUTES.toMillis(1L);
// TODO: Delegaate to RepositoryEncryptionConfiguration in NIFI-6617
private Map<String, String> encryptionKeys;
@ -416,6 +418,14 @@ public class RepositoryConfiguration {
this.debugFrequency = debugFrequency;
}
public long getMaintenanceFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(maintenanceFrequencyMillis, TimeUnit.MILLISECONDS);
}
public void setMaintenanceFrequency(final long period, final TimeUnit timeUnit) {
this.maintenanceFrequencyMillis = timeUnit.toMillis(period);
}
public static RepositoryConfiguration create(final NiFiProperties nifiProperties) {
final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths();
@ -426,13 +436,14 @@ public class RepositoryConfiguration {
final String storageSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
final String rolloverTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
final String rolloverSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
final int rolloverEventCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_ROLLOVER_EVENT_COUNT, Integer.MAX_VALUE);
final String shardSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
final int queryThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
final int indexThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 2);
final int journalCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
final int concurrentMergeThreads = nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2);
final String warmCacheFrequency = nifiProperties.getProperty(WARM_CACHE_FREQUENCY);
final String maintenanceFrequency = nifiProperties.getProperty(MAINTENACE_FREQUENCY);
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue();
final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS);
@ -475,6 +486,7 @@ public class RepositoryConfiguration {
config.setSearchableFields(searchableFields);
config.setSearchableAttributes(searchableAttributes);
config.setMaxEventFileCapacity(rolloverBytes);
config.setMaxEventFileCount(rolloverEventCount);
config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
config.setMaxStorageCapacity(maxStorageBytes);
@ -490,6 +502,10 @@ public class RepositoryConfiguration {
if (shardSize != null) {
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
}
if (maintenanceFrequency != null && !maintenanceFrequency.trim().equals("")) {
final long millis = FormatUtils.getTimeDuration(maintenanceFrequency.trim(), TimeUnit.MILLISECONDS);
config.setMaintenanceFrequency(millis, TimeUnit.MILLISECONDS);
}
config.setAlwaysSync(alwaysSync);

View File

@ -74,4 +74,9 @@ public class LatestEventsPerProcessorQuery implements CachedQuery {
return Optional.of(eventIds);
}
@Override
public String toString() {
return "Latest Events Per Processor";
}
}

View File

@ -52,4 +52,8 @@ public class LatestEventsQuery implements CachedQuery {
}
}
@Override
public String toString() {
return "Most Recent Events Query";
}
}

View File

@ -606,11 +606,14 @@ public class LuceneEventIndex implements EventIndex {
querySubmissionMap.put(query.getIdentifier(), submission);
final List<Long> eventIds = eventIdListOption.get();
logger.debug("Cached Query {} produced {} Event IDs for {}: {}", cachedQuery, eventIds.size(), query, eventIds);
queryExecutor.submit(() -> {
List<ProvenanceEventRecord> events;
try {
events = eventStore.getEvents(eventIds, authorizer, EventTransformer.EMPTY_TRANSFORMER);
logger.debug("Retrieved {} of {} Events from Event Store", events.size(), eventIds.size());
submission.getResult().update(events, eventIds.size());
} catch (final Exception e) {
submission.getResult().setError("Failed to retrieve Provenance Events from store; see logs for more details");
@ -639,7 +642,7 @@ public class LuceneEventIndex implements EventIndex {
querySubmissionMap.put(query.getIdentifier(), submission);
final org.apache.lucene.search.Query luceneQuery = LuceneUtil.convertQuery(query);
logger.debug("Submitting query {} with identifier {} against index directories {}", luceneQuery, query.getIdentifier(), indexDirectories);
logger.debug("Submitting query {} with identifier {} against {} index directories: {}", luceneQuery, query.getIdentifier(), indexDirectories.size(), indexDirectories);
if (indexDirectories.isEmpty()) {
submission.getResult().update(Collections.emptyList(), 0L);

View File

@ -19,6 +19,8 @@ package org.apache.nifi.provenance.index.lucene;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.provenance.ProgressiveResult;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -105,7 +107,7 @@ public class QueryTask implements Runnable {
try {
final long borrowMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - borrowStart);
logger.debug("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis);
logger.trace("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis);
final long startNanos = System.nanoTime();
// If max number of results are retrieved, do not bother querying lucene
@ -124,7 +126,11 @@ public class QueryTask implements Runnable {
final IndexReader indexReader = searcher.getIndexSearcher().getIndexReader();
final TopDocs topDocs;
try {
topDocs = searcher.getIndexSearcher().search(query, maxResults);
// Sort based on document id, descending. This gives us most recent events first.
final Sort sort = new Sort(new SortField(null, SortField.Type.DOC, true));
topDocs = searcher.getIndexSearcher().search(query, maxResults, sort);
} catch (final Exception e) {
logger.error("Failed to query Lucene for index " + indexDir, e);
queryResult.setError("Failed to query Lucene for index " + indexDir + " due to " + e);
@ -189,7 +195,7 @@ public class QueryTask implements Runnable {
final long endConvert = System.nanoTime();
final long ms = TimeUnit.NANOSECONDS.toMillis(endConvert - start);
logger.debug("Converting documents took {} ms", ms);
logger.trace("Converting documents took {} ms", ms);
List<ProvenanceEventRecord> events;
try {

View File

@ -62,7 +62,8 @@ public abstract class PartitionedEventStore implements EventStore {
@Override
public void initialize() throws IOException {
maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance"));
maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES);
final long maintenanceMillis = repoConfig.getMaintenanceFrequency(TimeUnit.MILLISECONDS);
maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, maintenanceMillis, maintenanceMillis, TimeUnit.MILLISECONDS);
for (final EventStorePartition partition : getPartitions()) {
partition.initialize();

View File

@ -175,7 +175,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
}
// Claim a Record Writer Lease so that we have a writer to persist the events to
RecordWriterLease lease = null;
RecordWriterLease lease;
while (true) {
lease = getLease();
if (lease.tryClaim()) {
@ -185,7 +185,6 @@ public class WriteAheadStorePartition implements EventStorePartition {
final RolloverState rolloverState = lease.getRolloverState();
if (rolloverState.isRollover()) {
final boolean success = tryRollover(lease);
if (success) {
logger.info("Successfully rolled over Event Writer for {} due to {}", this, rolloverState);
}
@ -476,9 +475,16 @@ public class WriteAheadStorePartition implements EventStorePartition {
public void purgeOldEvents(final long olderThan, final TimeUnit unit) {
final long timeCutoff = System.currentTimeMillis() - unit.toMillis(olderThan);
getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff)
final List<File> removed = getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff)
.sorted(DirectoryUtils.SMALLEST_ID_FIRST)
.forEach(this::delete);
.filter(this::delete)
.collect(Collectors.toList());
if (removed.isEmpty()) {
logger.debug("No Provenance Event files that exceed time-based threshold of {} {}", olderThan, unit);
} else {
logger.info("Purged {} Provenance Event files from Provenance Repository because the events were older than {} {}: {}", removed.size(), olderThan, unit, removed);
}
}
private File getActiveEventFile() {
@ -489,20 +495,27 @@ public class WriteAheadStorePartition implements EventStorePartition {
@Override
public long purgeOldestEvents() {
final List<File> eventFiles = getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
if (eventFiles.isEmpty()) {
if (eventFiles.size() < 2) {
// If there are no Event Files, there's nothing to do. If there is exactly 1 Event File, it means that the only Event File
// that exists is the Active Event File, which we are writing to, so we don't want to remove it either.
return 0L;
}
final File currentFile = getActiveEventFile();
if (currentFile == null) {
logger.debug("There is currently no Active Event File for {}. Will not purge oldest events until the Active Event File has been established.", this);
return 0L;
}
for (final File eventFile : eventFiles) {
if (eventFile.equals(currentFile)) {
continue;
break;
}
final long fileSize = eventFile.length();
if (delete(eventFile)) {
logger.debug("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
logger.info("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
return fileSize;
} else {
logger.warn("{} Failed to delete oldest event file {}. This file should be cleaned up manually.", this, eventFile);

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
public class TerminateFlowFile extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
session.remove(flowFile);
}
}

View File

@ -14,8 +14,9 @@
# limitations under the License.
org.apache.nifi.processors.tests.system.CountEvents
org.apache.nifi.processors.tests.system.GenerateFlowFile
org.apache.nifi.processors.tests.system.Sleep
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
org.apache.nifi.processors.tests.system.GenerateFlowFile
org.apache.nifi.processors.tests.system.Sleep
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.ValidateFileExists

View File

@ -18,6 +18,8 @@ package org.apache.nifi.tests.system;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;
public class InstanceConfiguration {
private final File bootstrapConfigFile;
@ -25,6 +27,7 @@ public class InstanceConfiguration {
private final File flowXmlGz;
private final File stateDirectory;
private final boolean autoStart;
private final Map<String, String> nifiPropertiesOverrides;
private InstanceConfiguration(Builder builder) {
this.bootstrapConfigFile = builder.bootstrapConfigFile;
@ -32,6 +35,7 @@ public class InstanceConfiguration {
this.flowXmlGz = builder.flowXmlGz;
this.stateDirectory = builder.stateDirectory;
this.autoStart = builder.autoStart;
this.nifiPropertiesOverrides = builder.nifiPropertiesOverrides;
}
public File getBootstrapConfigFile() {
@ -54,12 +58,26 @@ public class InstanceConfiguration {
return autoStart;
}
public Map<String, String> getNifiPropertiesOverrides() {
return nifiPropertiesOverrides;
}
public static class Builder {
private File bootstrapConfigFile;
private File instanceDirectory;
private File flowXmlGz;
private File stateDirectory;
private boolean autoStart = true;
private final Map<String, String> nifiPropertiesOverrides = new HashMap<>();
public Builder overrideNifiProperties(final Map<String, String> overrides) {
nifiPropertiesOverrides.clear();
if (overrides != null) {
nifiPropertiesOverrides.putAll(overrides);
}
return this;
}
public Builder bootstrapConfig(final File configFile) {
if (!configFile.exists()) {

View File

@ -20,6 +20,7 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
@ -47,6 +48,8 @@ import org.apache.nifi.web.api.dto.VariableDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
@ -67,6 +70,7 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
@ -80,6 +84,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -772,4 +777,45 @@ public class NiFiClientUtil {
final ProcessGroupEntity childGroup = nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, childGroupEntity);
return childGroup;
}
public ProvenanceEntity queryProvenance(final Map<SearchableField, String> searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException {
final Map<String, String> searchTermsAsStrings = searchTerms.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getSearchableFieldName(), Map.Entry::getValue));
final ProvenanceRequestDTO requestDto = new ProvenanceRequestDTO();
requestDto.setSearchTerms(searchTermsAsStrings);
requestDto.setSummarize(false);
requestDto.setStartDate(startTime == null ? null : new Date(startTime));
requestDto.setEndDate(endTime == null ? null : new Date(endTime));
requestDto.setMaxResults(1000);
final ProvenanceDTO dto = new ProvenanceDTO();
dto.setRequest(requestDto);
dto.setSubmissionTime(new Date());
final ProvenanceEntity entity = new ProvenanceEntity();
entity.setProvenance(dto);
ProvenanceEntity responseEntity = nifiClient.getProvenanceClient().submitProvenanceQuery(entity);
try {
responseEntity = waitForComplete(responseEntity);
} catch (final InterruptedException ie) {
Assert.fail("Interrupted while waiting for Provenance Query to complete");
}
nifiClient.getProvenanceClient().deleteProvenanceQuery(responseEntity.getProvenance().getId());
return responseEntity;
}
public ProvenanceEntity waitForComplete(final ProvenanceEntity entity) throws InterruptedException, NiFiClientException, IOException {
ProvenanceEntity current = entity;
while (current.getProvenance().isFinished() != Boolean.TRUE) {
Thread.sleep(100L);
current = nifiClient.getProvenanceClient().getProvenanceQuery(entity.getProvenance().getId());
}
return current;
}
}

View File

@ -33,6 +33,8 @@ import org.junit.rules.Timeout;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
@ -98,6 +100,10 @@ public abstract class NiFiSystemIT {
if (isDestroyFlowAfterEachTest()) {
destroyFlow();
}
if (isDestroyEnvironmentAfterEachTest()) {
cleanup();
}
} finally {
if (nifiClient != null) {
nifiClient.close();
@ -105,6 +111,10 @@ public abstract class NiFiSystemIT {
}
}
protected boolean isDestroyEnvironmentAfterEachTest() {
return false;
}
protected void destroyFlow() throws NiFiClientException, IOException {
getClientUtil().stopProcessGroupComponents("root");
getClientUtil().disableControllerServices("root");
@ -229,16 +239,25 @@ public abstract class NiFiSystemIT {
new InstanceConfiguration.Builder()
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
.instanceDirectory("target/standalone-instance")
.overrideNifiProperties(getNifiPropertiesOverrides())
.build());
}
protected Map<String, String> getNifiPropertiesOverrides() {
return Collections.emptyMap();
}
protected boolean isDestroyFlowAfterEachTest() {
return true;
}
protected void waitFor(final BooleanSupplier condition) throws InterruptedException {
waitFor(condition, 10L);
}
protected void waitFor(final BooleanSupplier condition, final long delayMillis) throws InterruptedException {
while (!condition.getAsBoolean()) {
Thread.sleep(10L);
Thread.sleep(delayMillis);
}
}

View File

@ -133,6 +133,24 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
final File destinationFlowXmlGz = new File(destinationConf, "flow.xml.gz");
Files.copy(flowXmlGz.toPath(), destinationFlowXmlGz.toPath());
}
// Write out any Property overrides
final Map<String, String> nifiPropertiesOverrides = instanceConfiguration.getNifiPropertiesOverrides();
if (nifiPropertiesOverrides != null && !nifiPropertiesOverrides.isEmpty()) {
final File destinationNifiProperties = new File(destinationConf, "nifi.properties");
final File sourceNifiProperties = new File(bootstrapConfigFile.getParentFile(), "nifi.properties");
final Properties nifiProperties = new Properties();
try (final InputStream fis = new FileInputStream(sourceNifiProperties)) {
nifiProperties.load(fis);
}
nifiPropertiesOverrides.forEach(nifiProperties::setProperty);
try (final OutputStream fos = new FileOutputStream(destinationNifiProperties)) {
nifiProperties.store(fos, null);
}
}
}
private void copyContents(final File dir, final File destinationDir) throws IOException {

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.provenance;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import static org.junit.Assert.assertEquals;
public class ProvenanceRepositoryIT extends NiFiSystemIT {
@Override
protected Map<String, String> getNifiPropertiesOverrides() {
final Map<String, String> properties = new HashMap<>();
// Force only a single Provenance Event File to exist
properties.put("nifi.provenance.repository.max.storage.size", "1 KB");
// Perform maintenance every 2 seconds to ensure that we don't have to wait a long time for old event files to roll off.
properties.put("nifi.provenance.repository.maintenance.frequency", "2 secs");
return properties;
}
@Override
protected boolean isDestroyEnvironmentAfterEachTest() {
// We need to destroy entire environment after each test to ensure that the repositories are destroyed.
// This is important because we are expecting exact numbers of events in the repo.
return true;
}
@Test
public void testSimpleQueryByComponentID() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity count = getClientUtil().createProcessor("CountEvents");
getClientUtil().setAutoTerminatedRelationships(count, "success");
getClientUtil().createConnection(generateFlowFile, count, "success");
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
getNifiClient().getProcessorClient().startProcessor(count);
final Map<SearchableField, String> searchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId());
ProvenanceEntity provenanceEntity = getClientUtil().queryProvenance(searchTerms, null, null);
assertEquals(0, provenanceEntity.getProvenance().getResults().getProvenanceEvents().size());
// Wait for there to be at least 1 event.
waitForEventCountAtLeast(searchTerms, 1);
provenanceEntity = getClientUtil().queryProvenance(searchTerms, null, null);
final List<ProvenanceEventDTO> events = provenanceEntity.getProvenance().getResults().getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventDTO firstEvent = events.get(0);
assertEquals(ProvenanceEventType.CREATE.name(), firstEvent.getEventType());
}
// If we add some events for Component ABC and then they age off, we should be able to query and get back 0 results.
// If we then add some more events for Component ABC and query, we should see those new events. Even if we have aged off
// 1000+ events (1000 = max results of the provenance query). This should be true whether NiFi is restarted in between or not.
// To ensure this, we have two tests that are very similar but one restarts NiFi in between and one does not.
// This test does not restart NiFi.
@Test
public void testAgeOffEventsThenAddSomeThenQuery() throws NiFiClientException, IOException, InterruptedException {
ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
generateFlowFile = getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "800"));
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
getClientUtil().setAutoTerminatedRelationships(terminate, "success");
getClientUtil().createConnection(generateFlowFile, terminate, "success");
generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
final Map<SearchableField, String> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId());
// Wait for there to be at least 1000 events for Generate processor and then stop the processor
waitForEventCountAtLeast(generateSearchTerms, 800);
getNifiClient().getProcessorClient().stopProcessor(generateFlowFile);
// Start Terminate proc & wait for at least 600 events to be registered. We do this because each Event File can hold up to 1,000 Events.
// The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
// depending on when the query is executed.
getNifiClient().getProcessorClient().startProcessor(terminate);
final Map<SearchableField, String> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminate.getId());
waitForEventCountAtLeast(terminateSearchTerms, 600);
waitForEventCountExactly(generateSearchTerms, 0);
// Emit 25 more events
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "25"));
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
// Wait for those 25 events to be emitted
waitForEventCountAtLeast(generateSearchTerms, 25);
}
// If we add some events for Component ABC and then they age off, we should be able to query and get back 0 results.
// If we then add some more events for Component ABC and query, we should see those new events. Even if we have aged off
// 1000+ events (1000 = max results of the provenance query). This should be true whether NiFi is restarted in between or not.
// To ensure this, we have two tests that are very similar but one restarts NiFi in between and one does not.
// This test does restart NiFi.
@Test
public void testAgeOffEventsThenRestartAddSomeThenQuery() throws NiFiClientException, IOException, InterruptedException {
ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
generateFlowFile = getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "800"));
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
getClientUtil().setAutoTerminatedRelationships(terminate, "success");
getClientUtil().createConnection(generateFlowFile, terminate, "success");
generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
final Map<SearchableField, String> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId());
// Wait for there to be at least 800 events for Generate processor and then stop it
waitForEventCountAtLeast(generateSearchTerms, 800);
getNifiClient().getProcessorClient().stopProcessor(generateFlowFile);
// Start Terminate proc & wait for at least 600 events to be registered. We do this because each Event File can hold up to 1,000 Events.
// The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
// depending on when the query is executed.
getNifiClient().getProcessorClient().startProcessor(terminate);
final Map<SearchableField, String> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminate.getId());
waitForEventCountAtLeast(terminateSearchTerms, 600);
getNifiClient().getProcessorClient().stopProcessor(terminate);
waitForEventCountExactly(generateSearchTerms, 0);
// Restart NiFi. We do this so that when we query provenance for the Processor we won't be able to use the "Cached" events
// and will instead have to query Lucene
getNiFiInstance().stop();
getNiFiInstance().start();
// Ensure that Terminate processor is stopped, since nifi could have shutdown before persisting flow.xml.gz
terminate.getRevision().setVersion(0L); // Reset the revision
getNifiClient().getProcessorClient().stopProcessor(terminate);
getClientUtil().waitForStoppedProcessor(terminate.getId());
// Emit 400 more events
generateFlowFile.getRevision().setVersion(0L); // Reset the revision
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "400"));
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
// Since we restarted, the previous Event File will be rolled over. And since it will be > 1 KB in size, it will age off almost immediately.
// This will leave us with only the 400 newly created events.
waitForEventCountExactly(generateSearchTerms, 400);
}
private void waitForEventCountExactly(final Map<SearchableField, String> searchTerms, final int expectedCount) throws InterruptedException {
waitForEventCount(searchTerms, count -> count == expectedCount);
}
private void waitForEventCountAtLeast(final Map<SearchableField, String> searchTerms, final int expectedCount) throws InterruptedException {
waitForEventCount(searchTerms, count -> count >= expectedCount);
}
private void waitForEventCount(final Map<SearchableField, String> searchTerms, final Predicate<Integer> predicate) throws InterruptedException {
// Wait for there to be at least 1000 events for Generate processor
waitFor(() -> {
try {
return predicate.test(getEventCount(searchTerms));
} catch (final Exception e) {
return false;
}
}, 500L);
}
private int getEventCount(final Map<SearchableField, String> searchTerms) throws NiFiClientException, IOException {
ProvenanceEntity provEntity = getClientUtil().queryProvenance(searchTerms, null, null);
return provEntity.getProvenance().getResults().getProvenanceEvents().size();
}
}

View File

@ -96,6 +96,7 @@ nifi.provenance.repository.directory.default=./provenance_repository
nifi.provenance.repository.max.storage.time=24 hours
nifi.provenance.repository.max.storage.size=1 GB
nifi.provenance.repository.rollover.time=30 secs
nifi.provenance.repository.rollover.events=1000
nifi.provenance.repository.rollover.size=100 MB
nifi.provenance.repository.query.threads=2
nifi.provenance.repository.index.threads=2

View File

@ -33,6 +33,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.PoliciesClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient;
@ -372,6 +373,21 @@ public class NiFiClientFactory implements ClientFactory<NiFiClient> {
return wrappedClient.getOutputPortClientForToken(proxiedEntity);
}
@Override
public ProvenanceClient getProvenanceClient() {
return wrappedClient.getProvenanceClient();
}
@Override
public ProvenanceClient getProvenanceClientForProxiedEntities(final String... proxiedEntity) {
return wrappedClient.getProvenanceClientForProxiedEntities(proxiedEntity);
}
@Override
public ProvenanceClient getProvenanceClientForToken(final String token) {
return wrappedClient.getProvenanceClientForToken(token);
}
@Override
public void close() throws IOException {
wrappedClient.close();

View File

@ -184,6 +184,14 @@ public interface NiFiClient extends Closeable {
OutputPortClient getOutputPortClientForToken(String token);
// ----- ProvenanceClient -----
ProvenanceClient getProvenanceClient();
ProvenanceClient getProvenanceClientForProxiedEntities(String... proxiedEntity);
ProvenanceClient getProvenanceClientForToken(String token);
/**
* The builder interface that implementations should provide for obtaining the client.

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import java.io.IOException;
public interface ProvenanceClient {
ProvenanceEntity submitProvenanceQuery(ProvenanceEntity provenanceQuery) throws NiFiClientException, IOException;
ProvenanceEntity getProvenanceQuery(String queryId) throws NiFiClientException, IOException;
ProvenanceEntity deleteProvenanceQuery(String queryId) throws NiFiClientException, IOException;
LineageEntity submitLineageRequest(LineageEntity lineageEntity) throws NiFiClientException, IOException;
LineageEntity getLineageRequest(String lineageRequestId) throws NiFiClientException, IOException;
LineageEntity deleteLineageRequest(String lineageRequestId) throws NiFiClientException, IOException;
}

View File

@ -35,6 +35,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.PoliciesClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient;
@ -392,6 +393,23 @@ public class JerseyNiFiClient implements NiFiClient {
return new JerseyOutputPortClient(baseTarget, headers);
}
@Override
public ProvenanceClient getProvenanceClient() {
return new JerseyProvenanceClient(baseTarget);
}
@Override
public ProvenanceClient getProvenanceClientForProxiedEntities(final String... proxiedEntity) {
final Map<String, String> headers = getHeaders(proxiedEntity);
return new JerseyProvenanceClient(baseTarget, headers);
}
@Override
public ProvenanceClient getProvenanceClientForToken(final String token) {
final Map<String, String> headers = getHeadersWithToken(token);
return new JerseyProvenanceClient(baseTarget, headers);
}
@Override
public void close() {
if (this.client != null) {

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.client.nifi.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
public class JerseyProvenanceClient extends AbstractJerseyClient implements ProvenanceClient {
private final WebTarget provenanceTarget;
public JerseyProvenanceClient(final WebTarget baseTarget) {
this(baseTarget, Collections.emptyMap());
}
public JerseyProvenanceClient(final WebTarget baseTarget, final Map<String, String> headers) {
super(headers);
this.provenanceTarget = baseTarget.path("/provenance");
}
@Override
public ProvenanceEntity submitProvenanceQuery(final ProvenanceEntity provenanceEntity) throws NiFiClientException, IOException {
if (provenanceEntity == null) {
throw new IllegalArgumentException("Provenance entity cannot be null");
}
return executeAction("Error submitting Provenance Query", () -> getRequestBuilder(provenanceTarget).post(
Entity.entity(provenanceEntity, MediaType.APPLICATION_JSON_TYPE),
ProvenanceEntity.class
));
}
@Override
public ProvenanceEntity getProvenanceQuery(final String queryId) throws NiFiClientException, IOException {
if (StringUtils.isBlank(queryId)) {
throw new IllegalArgumentException("Query ID cannot be null");
}
return executeAction("Error retrieving status of Provenance Query", () -> {
final WebTarget target = provenanceTarget.path("/{id}").resolveTemplate("id", queryId);
return getRequestBuilder(target).get(ProvenanceEntity.class);
});
}
@Override
public ProvenanceEntity deleteProvenanceQuery(final String provenanceQueryId) throws NiFiClientException, IOException {
if (provenanceQueryId == null) {
throw new IllegalArgumentException("Provenance Query ID cannot be null");
}
return executeAction("Error deleting Provenance Query", () -> {
final WebTarget target = provenanceTarget.path("/{id}")
.resolveTemplate("id", provenanceQueryId);
return getRequestBuilder(target).delete(ProvenanceEntity.class);
});
}
@Override
public LineageEntity submitLineageRequest(final LineageEntity lineageEntity) throws NiFiClientException, IOException {
if (lineageEntity == null) {
throw new IllegalArgumentException("Lineage entity cannot be null");
}
return executeAction("Error submitting Provenance Lineage Request", () -> getRequestBuilder(provenanceTarget.path("lineage")).post(
Entity.entity(lineageEntity, MediaType.APPLICATION_JSON_TYPE),
LineageEntity.class
));
}
@Override
public LineageEntity getLineageRequest(final String lineageRequestId) throws NiFiClientException, IOException {
if (StringUtils.isBlank(lineageRequestId)) {
throw new IllegalArgumentException("Lineage Request ID cannot be null");
}
return executeAction("Error retrieving status of Provenance Lineage Request", () -> {
final WebTarget target = provenanceTarget.path("/lineage/{id}").resolveTemplate("id", lineageRequestId);
return getRequestBuilder(target).get(LineageEntity.class);
});
}
@Override
public LineageEntity deleteLineageRequest(final String lineageRequestId) throws NiFiClientException, IOException {
if (lineageRequestId == null) {
throw new IllegalArgumentException("Lineage Request ID cannot be null");
}
return executeAction("Error deleting Provenance Lineage Request", () -> {
final WebTarget target = provenanceTarget.path("/lineage/{id}")
.resolveTemplate("id", lineageRequestId);
return getRequestBuilder(target).delete(LineageEntity.class);
});
}
}