NIFI-11557: Avoid using the expensive and unnecessary Files.walkFileTree on startup and initialization of Content Repository. Also performed some code cleanup: IntelliJ flagged many warnings in the class, mostly around methods that are no longer used and potential NullPointerExceptions, so those were cleaned up. Additionally, removed the nifi property for max flowfiles per claim - this property was never implemented. It was referenced, but the way in which is was used curiously had nothing to do with what the property was intended to be used for or for how it was documented. Instead, it was used to limit the max number of claims that could remain writable. As a result, it was removed.

NIFI-11557: Added an additional system test and updated github actions to include surefire-report in order to help diagnose problem that occurred in one of the last system-test runs in Github. Could not replicate problem locally
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #7265
This commit is contained in:
Mark Payne 2023-05-18 11:54:50 -04:00 committed by Matthew Burgess
parent ec01bce207
commit a12c9ca9c7
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
12 changed files with 366 additions and 295 deletions

View File

@ -110,6 +110,9 @@ jobs:
nifi-system-tests/nifi-system-test-suite/target/surefire-reports/**/*.txt
nifi-system-tests/nifi-system-test-suite/target/troubleshooting/**/*
retention-days: 7
- name: Upload Surefire Report
if: failure()
uses: scacap/action-surefire-report@v1
ubuntu:
timeout-minutes: 120
@ -154,6 +157,9 @@ jobs:
nifi-system-tests/nifi-system-test-suite/target/surefire-reports/**/*.txt
nifi-system-tests/nifi-system-test-suite/target/troubleshooting/**/*
retention-days: 7
- name: Upload Surefire Report
if: failure()
uses: scacap/action-surefire-report@v1
macos:
timeout-minutes: 120
@ -198,3 +204,6 @@ jobs:
nifi-system-tests/nifi-system-test-suite/target/surefire-reports/**/*.txt
nifi-system-tests/nifi-system-test-suite/target/troubleshooting/**/*
retention-days: 7
- name: Upload Surefire Report
if: failure()
uses: scacap/action-surefire-report@v1

View File

@ -103,7 +103,6 @@ public class NiFiProperties extends ApplicationProperties {
public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory.";
public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation";
public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size";
public static final String MAX_FLOWFILES_PER_CLAIM = "nifi.content.claim.max.flow.files";
public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period";
public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage";
public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage";
@ -375,7 +374,6 @@ public class NiFiProperties extends ApplicationProperties {
public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib";
public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions";
public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs";
public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100;
public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB";
public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
@ -1565,22 +1563,6 @@ public class NiFiProperties extends ApplicationProperties {
return provenanceRepositoryPaths;
}
/**
* Returns the number of claims to keep open for writing. Ideally, this will be at
* least as large as the number of threads that will be updating the repository simultaneously but we don't want
* to get too large because it will hold open up to this many FileOutputStreams.
* <p>
* Default is {@link #DEFAULT_MAX_FLOWFILES_PER_CLAIM}
*
* @return the maximum number of flow files per claim
*/
public int getMaxFlowFilesPerClaim() {
try {
return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM));
} catch (NumberFormatException nfe) {
return DEFAULT_MAX_FLOWFILES_PER_CLAIM;
}
}
/**
* Returns the maximum size, in bytes, that claims should grow before writing a new file. This means that we won't continually write to one

View File

@ -1276,9 +1276,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
LOG.info("Creating Content Repository [{}]", implementationClassName);
try {
final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ContentRepository.class, properties);
synchronized (contentRepo) {
contentRepo.initialize(new StandardContentRepositoryContext(resourceClaimManager, createEventReporter()));
}
contentRepo.initialize(new StandardContentRepositoryContext(resourceClaimManager, createEventReporter()));
return contentRepo;
} catch (final Exception e) {
throw new RuntimeException(e);

View File

@ -64,7 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -120,7 +119,6 @@ public class FileSystemRepository implements ContentRepository {
// in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of
// files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now.
private final long maxAppendableClaimLength;
private final int maxFlowFilesPerClaim;
private final long maxArchiveMillis;
private final Map<String, Long> minUsableContainerBytesForArchive = new HashMap<>();
private final boolean alwaysSync;
@ -132,27 +130,9 @@ public class FileSystemRepository implements ContentRepository {
// Map of container to archived files that should be deleted next.
private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>();
// guarded by synchronizing on this
private final AtomicLong oldestArchiveDate = new AtomicLong(0L);
private final NiFiProperties nifiProperties;
/**
* Default no args constructor for service loading only
*/
public FileSystemRepository() {
containers = null;
containerNames = null;
index = null;
archiveData = false;
maxArchiveMillis = 0;
alwaysSync = false;
containerCleanupExecutor = null;
nifiProperties = null;
maxAppendableClaimLength = 0;
maxFlowFilesPerClaim = 0;
writableClaimQueue = null;
}
public FileSystemRepository(final NiFiProperties nifiProperties) throws IOException {
this.nifiProperties = nifiProperties;
@ -161,8 +141,7 @@ public class FileSystemRepository implements ContentRepository {
for (final Path path : fileRespositoryPaths.values()) {
Files.createDirectories(path);
}
this.maxFlowFilesPerClaim = nifiProperties.getMaxFlowFilesPerClaim();
this.writableClaimQueue = new LinkedBlockingQueue<>(maxFlowFilesPerClaim);
this.writableClaimQueue = new LinkedBlockingQueue<>(1024);
final long configuredAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue();
final long appendableClaimLengthCap = DataUnit.parseDataSize(APPENDABLE_CLAIM_LENGTH_CAP, DataUnit.B).longValue();
if (configuredAppendableClaimLength > appendableClaimLengthCap) {
@ -230,7 +209,7 @@ public class FileSystemRepository implements ContentRepository {
throw new RuntimeException("System returned total space of the partition for " + containerName + " is zero byte. Nifi can not create a zero sized FileSystemRepository");
}
final long maxArchiveBytes = (long) (capacity * (1D - (maxArchiveRatio - 0.02)));
minUsableContainerBytesForArchive.put(container.getKey(), Long.valueOf(maxArchiveBytes));
minUsableContainerBytesForArchive.put(container.getKey(), maxArchiveBytes);
LOG.info("Maximum Threshold for Container {} set to {} bytes; if volume exceeds this size, archived data will be deleted until it no longer exceeds this size",
containerName, maxArchiveBytes);
@ -247,7 +226,7 @@ public class FileSystemRepository implements ContentRepository {
if (maxArchiveRatio <= 0D) {
maxArchiveMillis = 0L;
} else {
maxArchiveMillis = StringUtils.isEmpty(maxArchiveRetentionPeriod) ? Long.MAX_VALUE : FormatUtils.getTimeDuration(maxArchiveRetentionPeriod, TimeUnit.MILLISECONDS);
maxArchiveMillis = StringUtils.isEmpty(maxArchiveRetentionPeriod) ? Long.MAX_VALUE : Math.round(FormatUtils.getPreciseTimeDuration(maxArchiveRetentionPeriod, TimeUnit.MILLISECONDS));
}
this.alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty("nifi.content.repository.always.sync"));
@ -294,7 +273,7 @@ public class FileSystemRepository implements ContentRepository {
for (final OutputStream out : writableClaimStreams.values()) {
try {
out.close();
} catch (final IOException ioe) {
} catch (final IOException ignored) {
}
}
}
@ -308,7 +287,7 @@ public class FileSystemRepository implements ContentRepository {
private synchronized void initializeRepository() throws IOException {
final Map<String, Path> realPathMap = new HashMap<>();
final ExecutorService executor = Executors.newFixedThreadPool(containers.size());
final List<Future<Long>> futures = new ArrayList<>();
final List<Future<?>> futures = new ArrayList<>();
// Run through each of the containers. For each container, create the sections if necessary.
// Then, we need to scan through the archived data so that we can determine what the oldest
@ -332,61 +311,18 @@ public class FileSystemRepository implements ContentRepository {
realPathMap.put(containerName, realPath);
// We need to scan the archive directories to find out the oldest timestamp so that know whether or not we
// will have to delete archived data based on time threshold. Scanning all of the directories can be very
// expensive because of all of the disk accesses. So we do this in multiple threads. Since containers are
// often unique to a disk, we just map 1 thread to each container.
final Callable<Long> scanContainer = new Callable<Long>() {
@Override
public Long call() throws IOException {
final AtomicLong oldestDateHolder = new AtomicLong(0L);
// the path already exists, so scan the path to find any files and update maxIndex to the max of
// all filenames seen.
Files.walkFileTree(realPath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
LOG.warn("Content repository contains un-readable file or directory '" + file.getFileName() + "'. Skipping. ", exc);
return FileVisitResult.SKIP_SUBTREE;
}
@Override
public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
if (attrs.isDirectory()) {
return FileVisitResult.CONTINUE;
}
// Check if this is an 'archive' directory
if (isArchived(file)) {
final long lastModifiedTime = getLastModTime(file);
if (lastModifiedTime < oldestDateHolder.get()) {
oldestDateHolder.set(lastModifiedTime);
}
containerState.incrementArchiveCount();
}
return FileVisitResult.CONTINUE;
}
});
return oldestDateHolder.get();
}
};
// If the path didn't exist to begin with, there's no archive directory, so don't bother scanning.
if (pathExists) {
futures.add(executor.submit(scanContainer));
futures.add(executor.submit(() -> scanArchiveDirectories(realPath.toFile(), containerState)));
}
}
executor.shutdown();
for (final Future<Long> future : futures) {
// Wait for all futures to complete
for (final Future<?> future : futures) {
try {
final Long oldestDate = future.get();
if (oldestDate < oldestArchiveDate.get()) {
oldestArchiveDate.set(oldestDate);
}
future.get();
} catch (final ExecutionException | InterruptedException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
@ -400,6 +336,23 @@ public class FileSystemRepository implements ContentRepository {
containers.putAll(realPathMap);
}
private void scanArchiveDirectories(final File containerDir, final ContainerState containerState) {
for (int i=0; i < SECTIONS_PER_CONTAINER; i++) {
final File sectionDir = new File(containerDir, String.valueOf(i));
final File archiveDir = new File(sectionDir, ARCHIVE_DIR_NAME);
if (!archiveDir.exists()) {
continue;
}
final String[] filenames = archiveDir.list();
if (filenames == null) {
continue;
}
containerState.incrementArchiveCount(filenames.length);
}
}
// Visible for testing
boolean isArchived(final Path path) {
return isArchived(path.toFile());
@ -509,7 +462,7 @@ public class FileSystemRepository implements ContentRepository {
}
private void removeIncompleteContent(final Path fileToRemove, final String containerName) {
String fileDescription = null;
String fileDescription;
try {
fileDescription = fileToRemove.toFile().getAbsolutePath() + " (" + Files.size(fileToRemove) + " bytes)";
} catch (final IOException e) {
@ -556,16 +509,13 @@ public class FileSystemRepository implements ContentRepository {
}
@Override
public Set<ResourceClaim> getActiveResourceClaims(final String containerName) throws IOException {
public Set<ResourceClaim> getActiveResourceClaims(final String containerName) {
final Path containerPath = containers.get(containerName);
if (containerPath == null) {
return Collections.emptySet();
}
final ScanForActiveResourceClaims scan = new ScanForActiveResourceClaims(containerPath, containerName, resourceClaimManager, containers.keySet());
Files.walkFileTree(containerPath, scan);
final Set<ResourceClaim> activeResourceClaims = scan.getActiveResourceClaims();
final Set<ResourceClaim> activeResourceClaims = getActiveResourceClaims(containerPath.toFile(), containerName);
LOG.debug("Obtaining active resource claims, will return a list of {} resource claims for container {}", activeResourceClaims.size(), containerName);
if (LOG.isTraceEnabled()) {
@ -576,6 +526,40 @@ public class FileSystemRepository implements ContentRepository {
return activeResourceClaims;
}
public Set<ResourceClaim> getActiveResourceClaims(final File containerDir, final String containerName) {
final Set<ResourceClaim> activeResourceClaims = new HashSet<>();
for (int i=0; i < SECTIONS_PER_CONTAINER; i++) {
final String sectionName = String.valueOf(i);
final File sectionDir = new File(containerDir, sectionName);
if (!sectionDir.exists()) {
continue;
}
final File[] files = sectionDir.listFiles();
if (files == null) {
LOG.warn("Content repository contains un-readable file or directory [{}]. Skipping.", sectionDir.getAbsolutePath());
continue;
}
for (final File file : files) {
if (ARCHIVE_DIR_NAME.equals(file.getName())) {
continue;
}
final String identifier = file.getName();
ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(containerName, sectionName, identifier);
if (resourceClaim == null) {
resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, identifier, false, false);
}
activeResourceClaims.add(resourceClaim);
}
}
return activeResourceClaims;
}
private Path getPath(final ContentClaim claim) {
final ResourceClaim resourceClaim = claim.getResourceClaim();
return getPath(resourceClaim);
@ -683,7 +667,11 @@ public class FileSystemRepository implements ContentRepository {
// at the same time because we will call create() to get the claim before we write to it,
// and when we call create(), it will remove it from the Queue, which means that no other
// thread will get the same Claim until we've finished writing to it.
final File file = getPath(resourceClaim).toFile();
final Path resourceClaimPath = getPath(resourceClaim);
if (resourceClaimPath == null) {
throw new IOException("Could not determine file to write to for " + resourceClaim);
}
final File file = resourceClaimPath.toFile();
ByteCountingOutputStream claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length());
writableClaimStreams.put(resourceClaim, claimStream);
@ -753,7 +741,7 @@ public class FileSystemRepository implements ContentRepository {
Path path = null;
try {
path = getPath(claim);
} catch (final ContentNotFoundException cnfe) {
} catch (final ContentNotFoundException ignored) {
}
// Ensure that we have no writable claim streams for this resource claim
@ -767,10 +755,12 @@ public class FileSystemRepository implements ContentRepository {
}
}
final File file = path.toFile();
if (!file.delete() && file.exists()) {
LOG.warn("Unable to delete {} at path {}", new Object[]{claim, path});
return false;
if (path != null) {
final File file = path.toFile();
if (!file.delete() && file.exists()) {
LOG.warn("Unable to delete {} at path {}", new Object[]{claim, path});
return false;
}
}
return true;
@ -1028,7 +1018,6 @@ public class FileSystemRepository implements ContentRepository {
final ByteCountingOutputStream bcos = claimStream;
// TODO: Refactor OS implementation out (deduplicate methods, etc.)
final OutputStream out = new ContentRepositoryOutputStream(scc, bcos, initialLength);
LOG.debug("Writing to {}", out);
@ -1080,9 +1069,13 @@ public class FileSystemRepository implements ContentRepository {
try {
Thread.sleep(100L);
} catch (final Exception e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while attempting to purge Content Repository", e);
return;
}
}
}
if (!writable) {
throw new RepositoryPurgeException("File " + path.toFile().getAbsolutePath() + " is not writable");
}
@ -1127,7 +1120,7 @@ public class FileSystemRepository implements ContentRepository {
}
}
} catch (final Throwable t) {
LOG.error("Failed to cleanup content claims due to {}", t);
LOG.error("Failed to cleanup content claims", t);
}
}
}
@ -1146,7 +1139,7 @@ public class FileSystemRepository implements ContentRepository {
}
@Override
public boolean isAccessible(final ContentClaim contentClaim) throws IOException {
public boolean isAccessible(final ContentClaim contentClaim) {
if (contentClaim == null) {
return false;
}
@ -1200,30 +1193,11 @@ public class FileSystemRepository implements ContentRepository {
return writableClaimStreams.size();
}
protected ConcurrentMap<ResourceClaim, ByteCountingOutputStream> getWritableClaimStreams() {
return writableClaimStreams;
}
protected ByteCountingOutputStream getWritableClaimStreamByResourceClaim(ResourceClaim rc) {
return writableClaimStreams.get(rc);
}
protected ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
protected BlockingQueue<ClaimLengthPair> getWritableClaimQueue() {
return writableClaimQueue;
}
protected long getMaxAppendableClaimLength() {
return maxAppendableClaimLength;
}
protected boolean isAlwaysSync() {
return alwaysSync;
}
// marked protected for visibility and ability to override for unit tests.
protected boolean archive(final Path curPath) throws IOException {
// check if already archived
@ -1270,7 +1244,7 @@ public class FileSystemRepository implements ContentRepository {
final String creationTimestamp = filename.substring(0, dashIndex);
try {
return Long.parseLong(creationTimestamp);
} catch (final NumberFormatException nfe) {
} catch (final NumberFormatException ignored) {
}
}
@ -1294,17 +1268,16 @@ public class FileSystemRepository implements ContentRepository {
return (oldestArchiveDate <= removalTimeThreshold);
}
private long destroyExpiredArchives(final String containerName, final Path container) throws IOException {
private void destroyExpiredArchives(final String containerName, final Path container) throws IOException {
archiveExpirationLog.debug("Destroying Expired Archives for Container {}", containerName);
final List<ArchiveInfo> notYetExceedingThreshold = new ArrayList<>();
long removalTimeThreshold = System.currentTimeMillis() - maxArchiveMillis;
long oldestArchiveDateFound = System.currentTimeMillis();
// determine how much space we must have in order to stop deleting old data
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
if (minRequiredSpace == null) {
archiveExpirationLog.debug("Could not determine minimum required space so will not destroy any archived data");
return -1L;
return;
}
final long usableSpace = getContainerUsableSpace(containerName);
@ -1339,6 +1312,10 @@ public class FileSystemRepository implements ContentRepository {
// If so, then we call poll() to remove it
if (freed < toFree || getLastModTime(toDelete.toPath()) < removalTimeThreshold) {
toDelete = fileQueue.poll(); // remove the head of the queue, which is already stored in 'toDelete'
if (toDelete == null) {
continue;
}
Files.deleteIfExists(toDelete.toPath());
containerState.decrementArchiveCount();
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", toDelete.getName(), containerName);
@ -1346,7 +1323,7 @@ public class FileSystemRepository implements ContentRepository {
deleteCount++;
}
// If we'd freed up enough space, we're done... unless the next file needs to be destroyed based on time.
// If we've freed up enough space, we're done... unless the next file needs to be destroyed based on time.
if (freed >= toFree) {
// If the last mod time indicates that it should be removed, just continue loop.
if (deleteBasedOnTimestamp(fileQueue, removalTimeThreshold)) {
@ -1369,7 +1346,7 @@ public class FileSystemRepository implements ContentRepository {
deleteCount, containerName, new Date(oldestArchiveDate), millis);
}
return oldestArchiveDate;
return;
}
} catch (final IOException ioe) {
LOG.warn("Failed to delete {} from archive due to {}", toDelete, ioe.toString());
@ -1393,7 +1370,8 @@ public class FileSystemRepository implements ContentRepository {
try {
final long timestampThreshold = removalTimeThreshold;
Files.walkFileTree(archive, new SimpleFileVisitor<Path>() {
Files.walkFileTree(archive, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
if (attrs.isDirectory()) {
@ -1433,12 +1411,7 @@ public class FileSystemRepository implements ContentRepository {
final long deleteExpiredMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
// Sort the list according to last modified time
Collections.sort(notYetExceedingThreshold, new Comparator<ArchiveInfo>() {
@Override
public int compare(final ArchiveInfo o1, final ArchiveInfo o2) {
return Long.compare(o1.getLastModTime(), o2.getLastModTime());
}
});
notYetExceedingThreshold.sort(Comparator.comparing(ArchiveInfo::getLastModTime));
final long sortRemainingMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteExpiredMillis;
@ -1492,10 +1465,6 @@ public class FileSystemRepository implements ContentRepository {
oldestContainerArchive = notYetExceedingThreshold.get(0).getLastModTime();
}
if (oldestContainerArchive < oldestArchiveDateFound) {
oldestArchiveDateFound = oldestContainerArchive;
}
// Queue up the files in the order that they should be destroyed so that we don't have to scan the directories for a while.
for (final ArchiveInfo toEnqueue : notYetExceedingThreshold.subList(0, Math.min(100000, notYetExceedingThreshold.size()))) {
fileQueue.offer(toEnqueue);
@ -1504,7 +1473,7 @@ public class FileSystemRepository implements ContentRepository {
final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis;
LOG.debug("Oldest Archive Date for Container {} is {}; delete expired = {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms",
containerName, new Date(oldestContainerArchive), deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
return oldestContainerArchive;
return;
}
private class ArchiveOrDestroyDestructableClaims implements Runnable {
@ -1618,27 +1587,9 @@ public class FileSystemRepository implements ContentRepository {
@Override
public void run() {
try {
if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) {
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
if (minRequiredSpace == null) {
return;
}
try {
final long usableSpace = getContainerUsableSpace(containerName);
if (usableSpace > minRequiredSpace) {
return;
}
} catch (final Exception e) {
LOG.error("Failed to determine space available in container {}; will attempt to cleanup archive", containerName);
}
}
Thread.currentThread().setName("Cleanup Archive for " + containerName);
final long oldestContainerArchive;
try {
oldestContainerArchive = destroyExpiredArchives(containerName, containerPath);
destroyExpiredArchives(containerName, containerPath);
final ContainerState containerState = containerStateMap.get(containerName);
containerState.signalCreationReady(); // indicate that we've finished cleaning up the archive.
@ -1649,22 +1600,6 @@ public class FileSystemRepository implements ContentRepository {
}
return;
}
if (oldestContainerArchive < 0L) {
boolean updated;
do {
final long oldest = oldestArchiveDate.get();
if (oldestContainerArchive < oldest) {
updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive);
if (updated && LOG.isDebugEnabled()) {
LOG.debug("Oldest Archive Date is now {}", new Date(oldestContainerArchive));
}
} else {
updated = true;
}
} while (!updated);
}
} catch (final Throwable t) {
LOG.error("Failed to cleanup archive for container {} due to {}", containerName, t.toString());
LOG.error("", t);
@ -1751,6 +1686,9 @@ public class FileSystemRepository implements ContentRepository {
eventReporter.reportEvent(Severity.WARNING, "FileSystemRepository", message);
condition.await();
} catch (final InterruptedException e) {
LOG.warn("Interrupted while waiting for Content Repository archive expiration", e);
Thread.currentThread().interrupt();
return;
}
}
} finally {
@ -1788,6 +1726,10 @@ public class FileSystemRepository implements ContentRepository {
archivedFileCount.incrementAndGet();
}
public void incrementArchiveCount(final int count) {
archivedFileCount.addAndGet(count);
}
public long getArchiveCount() {
return archivedFileCount.get();
}
@ -1854,101 +1796,29 @@ public class FileSystemRepository implements ContentRepository {
* 1 second (1000 milliseconds). If attempt is made to set lower value a
* warning will be logged and the method will return minimum value of 1000
*/
private long determineCleanupInterval(NiFiProperties properties) {
long cleanupInterval = DEFAULT_CLEANUP_INTERVAL_MILLIS;
String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
if (archiveCleanupFrequency != null) {
try {
cleanupInterval = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(
"Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
}
if (cleanupInterval < MIN_CLEANUP_INTERVAL_MILLIS) {
LOG.warn("The value of " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY + " property is set to '"
+ archiveCleanupFrequency + "' which is "
+ "below the allowed minimum of 1 second (1000 milliseconds). Minimum value of 1 sec will be used as scheduling interval for archive cleanup task.");
cleanupInterval = MIN_CLEANUP_INTERVAL_MILLIS;
}
private long determineCleanupInterval(final NiFiProperties properties) {
final String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
if (archiveCleanupFrequency == null) {
return DEFAULT_CLEANUP_INTERVAL_MILLIS;
}
long cleanupInterval;
try {
cleanupInterval = Math.round(FormatUtils.getPreciseTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS));
} catch (final Exception e) {
throw new RuntimeException("Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, e);
}
if (cleanupInterval < MIN_CLEANUP_INTERVAL_MILLIS) {
LOG.warn("The value of the '{}' property is set to [{}] which is below the allowed minimum of 1 second. Will use 1 second as scheduling interval for archival cleanup task.",
NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, archiveCleanupFrequency);
cleanupInterval = MIN_CLEANUP_INTERVAL_MILLIS;
}
return cleanupInterval;
}
private static class ScanForActiveResourceClaims extends SimpleFileVisitor<Path> {
private static final Pattern SECTION_NAME_PATTERN = Pattern.compile("\\d{0,4}");
private final String containerName;
private final ResourceClaimManager resourceClaimManager;
private final Set<String> containerNames;
private final Path rootPath;
private final Set<ResourceClaim> activeResourceClaims = new HashSet<>();
private String sectionName = null;
public ScanForActiveResourceClaims(final Path rootPath, final String containerName, final ResourceClaimManager resourceClaimManager, final Set<String> containerNames) {
this.rootPath = rootPath;
this.containerName = containerName;
this.resourceClaimManager = resourceClaimManager;
this.containerNames = containerNames;
}
public Set<ResourceClaim> getActiveResourceClaims() {
return activeResourceClaims;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
LOG.warn("Content repository contains un-readable file or directory '" + file.getFileName() + "'. Skipping. ", exc);
return FileVisitResult.SKIP_SUBTREE;
}
@Override
public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attrs) throws IOException {
if (dir.equals(rootPath)) {
return FileVisitResult.CONTINUE;
}
// Check if this is an 'archive' directory
final String dirName = dir.toFile().getName();
if (containerNames.contains(dirName)) {
LOG.debug("Obtaining active resource claims, will traverse into Container {}", dirName);
return FileVisitResult.CONTINUE;
}
if (SECTION_NAME_PATTERN.matcher(dirName).matches()) {
LOG.debug("Obtaining active resource claims, will traverse into Section {}", dirName);
sectionName = dirName;
return FileVisitResult.CONTINUE;
} else {
LOG.debug("Obtaining active resource claims, will NOT traverse into sub-directory {}", dirName);
return FileVisitResult.SKIP_SUBTREE;
}
}
@Override
public FileVisitResult visitFile(final Path path, final BasicFileAttributes attrs) throws IOException {
if (attrs.isDirectory()) {
return FileVisitResult.CONTINUE;
}
final File file = path.toFile();
if (sectionName == null || !sectionName.equals(file.getParentFile().getName())) {
LOG.debug("Obtaining active resource claims, will NOT consider {} because its parent is not the current section", file);
return FileVisitResult.CONTINUE;
}
final String identifier = file.getName();
ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(containerName, sectionName, identifier);
if (resourceClaim == null) {
resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, identifier, false, false);
}
activeResourceClaims.add(resourceClaim);
return FileVisitResult.CONTINUE;
}
}
protected class ContentRepositoryOutputStream extends OutputStream {
protected final StandardContentClaim scc;

View File

@ -55,16 +55,10 @@ import java.util.Objects;
public class EncryptedFileSystemRepository extends FileSystemRepository {
private static final Logger logger = LoggerFactory.getLogger(EncryptedFileSystemRepository.class);
private RepositoryEncryptor<OutputStream, InputStream> repositoryEncryptor;
private final RepositoryEncryptor<OutputStream, InputStream> repositoryEncryptor;
private String keyId;
private final String keyId;
/**
* Default no args constructor for service loading only
*/
public EncryptedFileSystemRepository() {
}
public EncryptedFileSystemRepository(final NiFiProperties niFiProperties) throws IOException {
super(niFiProperties);

View File

@ -30,6 +30,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@ -50,6 +51,7 @@ import java.nio.file.StandardOpenOption;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -135,6 +137,107 @@ public class TestFileSystemRepository {
assertTrue(repository.isArchived(Paths.get("a/b/c/archive/1.txt")));
}
@Test
@Timeout(30)
public void testClaimsArchivedWhenMarkedDestructable() throws IOException, InterruptedException {
final ContentClaim contentClaim = repository.create(false);
final long configuredAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue();
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();
assertEquals(1, containerPaths.size());
final String containerName = containerPaths.keySet().iterator().next();
try (final OutputStream out = repository.write(contentClaim)) {
long bytesWritten = 0L;
final byte[] bytes = "Hello World".getBytes(StandardCharsets.UTF_8);
while (bytesWritten <= configuredAppendableClaimLength) {
out.write(bytes);
bytesWritten += bytes.length;
}
}
assertEquals(0, repository.getArchiveCount(containerName));
assertEquals(0, claimManager.decrementClaimantCount(contentClaim.getResourceClaim()));
claimManager.markDestructable(contentClaim.getResourceClaim());
// The claim should become archived but it may take a few seconds, as it's handled by background threads
while (repository.getArchiveCount(containerName) != 1) {
Thread.sleep(50L);
}
}
@Test
@Timeout(value=30)
public void testArchivedClaimRemovedDueToAge() throws IOException, InterruptedException {
// Recreate Repository with specific properties
final Map<String, String> propertyOverrides = new HashMap<>();
propertyOverrides.put(NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD, "2 sec");
propertyOverrides.put(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 sec");
propertyOverrides.put(NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE, "99%");
recreateRepositoryWithPropertyOverrides(propertyOverrides);
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();
assertEquals(1, containerPaths.size());
final Path containerPath = containerPaths.values().iterator().next();
// Perform a few iterations to ensure that it works not just the first time, since there is a lot of logic on initialization.
for (int i=0; i< 3; i++) {
final File archiveDir = containerPath.resolve(String.valueOf(i)).resolve("archive").toFile();
assertTrue(archiveDir.mkdirs());
final File archivedFile = new File(archiveDir, "1234");
try (final OutputStream fos = new FileOutputStream(archivedFile)) {
fos.write("Hello World".getBytes());
}
while (archivedFile.exists()) {
Thread.sleep(50L);
}
}
}
@Test
@Timeout(value=30)
public void testArchivedClaimRemovedDueToDiskUsage() throws IOException, InterruptedException {
// Recreate Repository with specific properties
final Map<String, String> propertyOverrides = new HashMap<>();
propertyOverrides.put(NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD, "555 days");
propertyOverrides.put(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 sec");
propertyOverrides.put(NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE, "1%");
recreateRepositoryWithPropertyOverrides(propertyOverrides);
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();
assertEquals(1, containerPaths.size());
final Path containerPath = containerPaths.values().iterator().next();
// Perform a few iterations to ensure that it works not just the first time, since there is a lot of logic on initialization.
for (int i=0; i< 3; i++) {
final File archiveDir = containerPath.resolve(String.valueOf(i)).resolve("archive").toFile();
assertTrue(archiveDir.mkdirs());
final File archivedFile = new File(archiveDir, "1234");
try (final OutputStream fos = new FileOutputStream(archivedFile)) {
fos.write("Hello World".getBytes());
}
while (archivedFile.exists()) {
Thread.sleep(50L);
}
}
}
private void recreateRepositoryWithPropertyOverrides(final Map<String, String> propertyOverrides) throws IOException {
repository.shutdown();
nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile(), propertyOverrides);
repository = new FileSystemRepository(nifiProperties);
claimManager = new StandardResourceClaimManager();
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
repository.purge();
}
@Test
public void testUnreferencedFilesAreArchivedOnCleanup() throws IOException {
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();

View File

@ -367,10 +367,6 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
}
}
protected void addTempComponent(final ConfigurableComponent instance, final BundleCoordinate coordinate) {
final String cacheKey = getClassBundleKey(instance.getClass().getCanonicalName(), coordinate);
tempComponentLookup.put(cacheKey, instance);
}
/**
* Registers extension for the specified type from the specified Bundle.

View File

@ -136,10 +136,12 @@ public class NiFiClientUtil {
private final NiFiClient nifiClient;
private final String nifiVersion;
private final String testName;
public NiFiClientUtil(final NiFiClient client, final String nifiVersion) {
public NiFiClientUtil(final NiFiClient client, final String nifiVersion, final String testName) {
this.nifiClient = client;
this.nifiVersion = nifiVersion;
this.testName = testName.replace("()", "");
}
private ProcessorClient getProcessorClient() {
@ -199,7 +201,21 @@ public class NiFiClientUtil {
entity.setRevision(createNewRevision());
entity.setDisconnectedNodeAcknowledged(true);
return getProcessorClient().createProcessor(processGroupId, entity);
final ProcessorEntity processor = getProcessorClient().createProcessor(processGroupId, entity);
logger.info("Created Processor [type={}, id={}, name={}, parentGroupId={}] for Test [{}]", simpleName(type), processor.getId(), processor.getComponent().getName(), processGroupId, testName);
return processor;
}
private String simpleName(final String type) {
final int lastIndex = type.lastIndexOf(".");
if (lastIndex <= 0) {
return type;
}
if (lastIndex == type.length() -1) {
return type;
}
return type.substring(lastIndex + 1);
}
public ParameterProviderEntity createParameterProvider(final String simpleTypeName) throws NiFiClientException, IOException {
@ -298,11 +314,15 @@ public class NiFiClientUtil {
entity.setRevision(createNewRevision());
entity.setDisconnectedNodeAcknowledged(true);
final ControllerServiceEntity service;
if (processGroupId == null) {
return nifiClient.getControllerClient().createControllerService(entity);
service = nifiClient.getControllerClient().createControllerService(entity);
} else {
service = nifiClient.getControllerServicesClient().createControllerService(processGroupId, entity);
}
return nifiClient.getControllerServicesClient().createControllerService(processGroupId, entity);
logger.info("Created Controller Service [type={}, id={}, name={}, groupId={}] for Test [{}]", simpleName(type), service.getId(), service.getComponent().getName(), processGroupId, testName);
return service;
}
public BundleDTO getTestBundle() {
@ -323,7 +343,10 @@ public class NiFiClientUtil {
entity.setRevision(createNewRevision());
entity.setDisconnectedNodeAcknowledged(true);
return nifiClient.getControllerClient().createReportingTask(entity);
final ReportingTaskEntity reportingTask = nifiClient.getControllerClient().createReportingTask(entity);
logger.info("Created Reporting Task [type={}, id={}] for Test [{}]", simpleName(type), reportingTask.getId(), testName);
return reportingTask;
}
public ReportingTaskEntity updateReportingTaskProperties(final ReportingTaskEntity currentEntity, final Map<String, String> properties) throws NiFiClientException, IOException {
@ -491,6 +514,8 @@ public class NiFiClientUtil {
parameterProviderConfiguration);
final ParameterContextEntity createdContextEntity = nifiClient.getParamContextClient().createParamContext(contextEntity);
logger.info("Created Parameter Context [id={}, name={}] for Test [{}]", createdContextEntity.getId(), contextName, testName);
return createdContextEntity;
}
@ -1140,7 +1165,15 @@ public class NiFiClientUtil {
connectionEntity.setRevision(createNewRevision());
connectionEntity.setDisconnectedNodeAcknowledged(true);
return getConnectionClient().createConnection(connectionGroupId, connectionEntity);
final ConnectionEntity connection = getConnectionClient().createConnection(connectionGroupId, connectionEntity);
final String sourceInfo = String.format("[type=%s, id=%s, name=%s, groupId=%s]", source.getType(), source.getId(), source.getName(), source.getGroupId());
final String destinationInfo = String.format("[type=%s, id=%s, name=%s, groupId=%s]", destination.getType(), destination.getId(), destination.getName(), destination.getGroupId());
logger.info("Created Connection [id={}, source={}, destination={}, relationships={}, parentGroupId={}] for Test [{}]",
connection.getId(), sourceInfo, destinationInfo, relationships, connectionGroupId, testName);
return connection;
}
public ConnectableDTO createConnectableDTO(final ProcessorEntity processor) {
@ -1223,7 +1256,9 @@ public class NiFiClientUtil {
entity.setComponent(component);
entity.setRevision(createNewRevision());
return nifiClient.getRemoteProcessGroupClient().createRemoteProcessGroup(parentGroupId, entity);
final RemoteProcessGroupEntity rpg = nifiClient.getRemoteProcessGroupClient().createRemoteProcessGroup(parentGroupId, entity);
logger.info("Created Remote Process Group [id={}, protocol={}, url={}, parentGroupId={}] for Test [{}]", rpg.getId(), transportProtocol, parentGroupId, testName);
return rpg;
}
public PortEntity createRemoteInputPort(final String parentGroupId, final String portName) throws NiFiClientException, IOException {
@ -1395,6 +1430,7 @@ public class NiFiClientUtil {
childGroupEntity.setComponent(component);
final ProcessGroupEntity childGroup = nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, childGroupEntity);
logger.info("Created Process Group [id={}, name={}, parentGroupId={}] for Test [{}]", childGroup.getId(), name, parentGroupId, testName);
return childGroup;
}
@ -1407,7 +1443,9 @@ public class NiFiClientUtil {
inputPortEntity.setRevision(createNewRevision());
inputPortEntity.setComponent(component);
return nifiClient.getInputPortClient().createInputPort(groupId, inputPortEntity);
final PortEntity port = nifiClient.getInputPortClient().createInputPort(groupId, inputPortEntity);
logger.info("Created Input Port [id={}, name={}, parentGroupId={}] for Test [{}]", port.getId(), name, groupId, testName);
return port;
}
public PortEntity createOutputPort(final String name, final String groupId) throws NiFiClientException, IOException {
@ -1419,7 +1457,9 @@ public class NiFiClientUtil {
outputPortEntity.setRevision(createNewRevision());
outputPortEntity.setComponent(component);
return nifiClient.getOutputPortClient().createOutputPort(groupId, outputPortEntity);
final PortEntity port = nifiClient.getOutputPortClient().createOutputPort(groupId, outputPortEntity);
logger.info("Created Output Port [id={}, name={}, parentGroupId={}] for Test [{}]", port.getId(), name, groupId, testName);
return port;
}
public ProvenanceEntity queryProvenance(final Map<SearchableField, ProvenanceSearchValueDTO> searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException {

View File

@ -109,6 +109,10 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
}
}
protected TestInfo getTestInfo() {
return testInfo;
}
@AfterAll
public static void cleanup() {
final NiFiInstance nifi = nifiRef.get();
@ -252,7 +256,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
protected void setupClient(final int apiPort) {
nifiClient = createClient(apiPort);
clientUtil = new NiFiClientUtil(nifiClient, getNiFiVersion());
clientUtil = new NiFiClientUtil(nifiClient, getNiFiVersion(), getTestName());
}
protected NiFiClientUtil getClientUtil() {

View File

@ -23,6 +23,8 @@ import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
@ -34,6 +36,7 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.SnippetEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import org.junit.jupiter.api.Assertions;
@ -43,6 +46,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -142,6 +146,57 @@ public class RegistryClientIT extends NiFiSystemIT {
}
@Test
public void testChangeConnectionDestinationRemoveOldAndMoveGroup() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient();
final NiFiClientUtil util = getClientUtil();
// Create a PG that contains Generate -> Count
final ProcessGroupEntity parent = util.createProcessGroup("Parent", "root");
final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", parent.getId());
final ProcessorEntity countProcessor = util.createProcessor("CountFlowFiles", parent.getId());
final ConnectionEntity generateToCount = util.createConnection(generate, countProcessor, "success");
// Save the flow as v1
final VersionControlInformationEntity v1Vci = util.startVersionControl(parent, clientEntity, "testChangeConnectionDestinationRemoveOldAndMoveGroup", "Parent");
// Create a Terminate processor and change flow to be:
// Generate -> Terminate - remove the old Count Processor
final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", parent.getId());
generateToCount.setDestinationId(terminate.getId());
generateToCount.getComponent().setDestination(util.createConnectableDTO(terminate));
final ConnectionEntity generateToTerminate = getNifiClient().getConnectionClient().updateConnection(generateToCount);
getNifiClient().getProcessorClient().deleteProcessor(countProcessor);
final ProcessGroupEntity childGroup = util.createProcessGroup("Child", parent.getId());
// Move the Generate, Terminate, and Connection to the child group
final Map<String, RevisionDTO> processorRevisions = new HashMap<>();
processorRevisions.put(generate.getId(), generate.getRevision());
processorRevisions.put(terminate.getId(), terminate.getRevision());
final SnippetDTO snippetDto = new SnippetDTO();
snippetDto.setConnections(Collections.singletonMap(generateToTerminate.getId(), generateToTerminate.getRevision()));
snippetDto.setProcessors(processorRevisions);
snippetDto.setParentGroupId(parent.getId());
final SnippetEntity snippet = new SnippetEntity();
snippet.setSnippet(snippetDto);
final SnippetEntity createdSnippet = getNifiClient().getSnippetClient().createSnippet(snippet);
createdSnippet.getSnippet().setParentGroupId(childGroup.getId());
getNifiClient().getSnippetClient().updateSnippet(createdSnippet);
// Save the flow as v2
util.saveFlowVersion(parent, clientEntity, v1Vci);
util.changeFlowVersion(parent.getId(), 1);
util.changeFlowVersion(parent.getId(), 2);
}
@Test
public void testControllerServiceUpdateWhileRunning() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient();
@ -199,6 +254,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertEquals("1", thirdFlowFileAttributes.get("count"));
}
@Test
public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
@ -310,7 +366,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessGroupEntity group = getClientUtil().createProcessGroup("Outer", "root");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", group.getId());
final VersionControlInformationEntity vci = getClientUtil().startVersionControl(group, clientEntity, "First Bucket", "First Flow");
getClientUtil().startVersionControl(group, clientEntity, "First Bucket", "First Flow");
String versionedFlowState = getVersionedFlowState(group.getId(), "root");
assertEquals("UP_TO_DATE", versionedFlowState);

View File

@ -28,4 +28,6 @@ public interface SnippetClient {
* @return the created entity
*/
SnippetEntity createSnippet(SnippetEntity snippet) throws NiFiClientException, IOException;
SnippetEntity updateSnippet(SnippetEntity snippet) throws NiFiClientException, IOException;
}

View File

@ -51,4 +51,20 @@ public class JerseySnippetClient extends AbstractJerseyClient implements Snippet
SnippetEntity.class
));
}
@Override
public SnippetEntity updateSnippet(final SnippetEntity snippet) throws NiFiClientException, IOException {
if (snippet == null) {
throw new IllegalArgumentException("Snippet entity cannot be null");
}
return executeAction("Error updating snippet", () -> {
final WebTarget target = snippetTarget
.path("/{id}")
.resolveTemplate("id", snippet.getSnippet().getId());
final Entity<SnippetEntity> entity = Entity.entity(snippet, MediaType.APPLICATION_JSON);
return getRequestBuilder(target).put(entity, SnippetEntity.class);
});
}
}