diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 18a3de1cc0..d06b462928 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -66,8 +66,8 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.stream.io.ByteCountingOutputStream; -import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.NiFiProperties; @@ -1086,42 +1086,44 @@ public class FileSystemRepository implements ContentRepository { return Files.exists(getArchivePath(contentClaim.getResourceClaim())); } - private void archive(final ResourceClaim claim) throws IOException { + private boolean archive(final ResourceClaim claim) throws IOException { if (!archiveData) { - return; + return false; } synchronized (writableClaimQueue) { final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim); if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { - return; + return false; } } final Path curPath = getPath(claim); if (curPath == null) { - return; + return false; } - archive(curPath); + final boolean archived = archive(curPath); LOG.debug("Successfully moved {} to archive", claim); + return archived; } - private void archive(final Path curPath) throws IOException { + private boolean archive(final Path curPath) throws IOException { // check if already archived final boolean alreadyArchived = ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName()); if (alreadyArchived) { - return; + return false; } final Path archivePath = getArchivePath(curPath); if (curPath.equals(archivePath)) { LOG.warn("Cannot archive {} because it is already archived", curPath); - return; + return false; } try { Files.move(curPath, archivePath); + return true; } catch (final NoSuchFileException nsfee) { // If the current path exists, try to create archive path and do the move again. // Otherwise, either the content was removed or has already been archived. Either way, @@ -1134,7 +1136,10 @@ public class FileSystemRepository implements ContentRepository { // for the existence of the directory continually. Files.createDirectories(archivePath.getParent()); Files.move(curPath, archivePath); + return true; } + + return false; } } @@ -1159,6 +1164,19 @@ public class FileSystemRepository implements ContentRepository { return getLastModTime(file.toFile()); } + private boolean deleteBasedOnTimestamp(final BlockingQueue fileQueue, final long removalTimeThreshold) throws IOException { + // check next file's last mod time. + final ArchiveInfo nextFile = fileQueue.peek(); + if (nextFile == null) { + // Continue on to queue up the files, in case the next file must be destroyed based on time. + return false; + } + + // If the last mod time indicates that it should be removed, just continue loop. + final long oldestArchiveDate = getLastModTime(nextFile.toPath()); + return (oldestArchiveDate <= removalTimeThreshold); + } + private long destroyExpiredArchives(final String containerName, final Path container) throws IOException { final List notYetExceedingThreshold = new ArrayList<>(); final long removalTimeThreshold = System.currentTimeMillis() - maxArchiveMillis; @@ -1177,37 +1195,46 @@ public class FileSystemRepository implements ContentRepository { final long startNanos = System.nanoTime(); final long toFree = minRequiredSpace - usableSpace; final BlockingQueue fileQueue = archivedFiles.get(containerName); + ArchiveInfo toDelete; int deleteCount = 0; long freed = 0L; - while ((toDelete = fileQueue.poll()) != null) { + while ((toDelete = fileQueue.peek()) != null) { try { final long fileSize = toDelete.getSize(); - Files.deleteIfExists(toDelete.toPath()); - containerState.decrementArchiveCount(); - LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", toDelete.getName(), containerName); - freed += fileSize; - deleteCount++; + + // we use fileQueue.peek above instead of fileQueue.poll() because we don't always want to + // remove the head of the queue. Instead, we want to remove it only if we plan to delete it. + // In order to accomplish this, we just peek at the head and check if it should be deleted. + // If so, then we call poll() to remove it + if (freed < toFree || getLastModTime(toDelete.toPath()) < removalTimeThreshold) { + toDelete = fileQueue.poll(); // remove the head of the queue, which is already stored in 'toDelete' + Files.deleteIfExists(toDelete.toPath()); + containerState.decrementArchiveCount(); + LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", toDelete.getName(), containerName); + freed += fileSize; + deleteCount++; + } // If we'd freed up enough space, we're done... unless the next file needs to be destroyed based on time. if (freed >= toFree) { - // check next file's last mod time. - final ArchiveInfo nextFile = fileQueue.peek(); - if (nextFile == null) { - // Continue on to queue up the files, in case the next file must be destroyed based on time. - break; - } - // If the last mod time indicates that it should be removed, just continue loop. - final long oldestArchiveDate = getLastModTime(nextFile.toPath()); - if (oldestArchiveDate <= removalTimeThreshold) { + if (deleteBasedOnTimestamp(fileQueue, removalTimeThreshold)) { continue; } + final ArchiveInfo archiveInfo = fileQueue.peek(); + final long oldestArchiveDate = archiveInfo == null ? System.currentTimeMillis() : getLastModTime(archiveInfo.toPath()); + // Otherwise, we're done. Return the last mod time of the oldest file in the container's archive. final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - LOG.info("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis", + if (deleteCount > 0) { + LOG.info("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis", deleteCount, containerName, new Date(oldestArchiveDate), millis); + } else { + LOG.debug("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis", + deleteCount, containerName, new Date(oldestArchiveDate), millis); + } return oldestArchiveDate; } @@ -1354,9 +1381,10 @@ public class FileSystemRepository implements ContentRepository { for (final ResourceClaim claim : toRemove) { if (archiveData) { try { - archive(claim); - containerState.incrementArchiveCount(); - successCount++; + if (archive(claim)) { + containerState.incrementArchiveCount(); + successCount++; + } } catch (final Exception e) { LOG.warn("Failed to archive {} due to {}", claim, e.toString()); if (LOG.isDebugEnabled()) { @@ -1376,7 +1404,7 @@ public class FileSystemRepository implements ContentRepository { if (successCount == 0) { LOG.debug("No ContentClaims archived/removed for Container {}", container); } else { - LOG.info("Successfully {} {} Content Claims for Container {} in {} millis", archiveData ? "archived" : "destroyed", successCount, container, millis); + LOG.info("Successfully {} {} Resource Claims for Container {} in {} millis", archiveData ? "archived" : "destroyed", successCount, container, millis); } }