NIFI-905: Ensure that when archive threshold is hit, archived data is destroyed and if no archived data exists that Processors aren't blocked from updating content repo

This commit is contained in:
Mark Payne 2015-08-28 14:15:35 -04:00
parent 85cb5dd1e7
commit 3159cec782
1 changed files with 57 additions and 29 deletions

View File

@ -66,8 +66,8 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.NiFiProperties;
@ -1086,42 +1086,44 @@ public class FileSystemRepository implements ContentRepository {
return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
}
private void archive(final ResourceClaim claim) throws IOException {
private boolean archive(final ResourceClaim claim) throws IOException {
if (!archiveData) {
return;
return false;
}
synchronized (writableClaimQueue) {
final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim);
if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
return;
return false;
}
}
final Path curPath = getPath(claim);
if (curPath == null) {
return;
return false;
}
archive(curPath);
final boolean archived = archive(curPath);
LOG.debug("Successfully moved {} to archive", claim);
return archived;
}
private void archive(final Path curPath) throws IOException {
private boolean archive(final Path curPath) throws IOException {
// check if already archived
final boolean alreadyArchived = ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName());
if (alreadyArchived) {
return;
return false;
}
final Path archivePath = getArchivePath(curPath);
if (curPath.equals(archivePath)) {
LOG.warn("Cannot archive {} because it is already archived", curPath);
return;
return false;
}
try {
Files.move(curPath, archivePath);
return true;
} catch (final NoSuchFileException nsfee) {
// If the current path exists, try to create archive path and do the move again.
// Otherwise, either the content was removed or has already been archived. Either way,
@ -1134,7 +1136,10 @@ public class FileSystemRepository implements ContentRepository {
// for the existence of the directory continually.
Files.createDirectories(archivePath.getParent());
Files.move(curPath, archivePath);
return true;
}
return false;
}
}
@ -1159,6 +1164,19 @@ public class FileSystemRepository implements ContentRepository {
return getLastModTime(file.toFile());
}
private boolean deleteBasedOnTimestamp(final BlockingQueue<ArchiveInfo> fileQueue, final long removalTimeThreshold) throws IOException {
// check next file's last mod time.
final ArchiveInfo nextFile = fileQueue.peek();
if (nextFile == null) {
// Continue on to queue up the files, in case the next file must be destroyed based on time.
return false;
}
// If the last mod time indicates that it should be removed, just continue loop.
final long oldestArchiveDate = getLastModTime(nextFile.toPath());
return (oldestArchiveDate <= removalTimeThreshold);
}
private long destroyExpiredArchives(final String containerName, final Path container) throws IOException {
final List<ArchiveInfo> notYetExceedingThreshold = new ArrayList<>();
final long removalTimeThreshold = System.currentTimeMillis() - maxArchiveMillis;
@ -1177,37 +1195,46 @@ public class FileSystemRepository implements ContentRepository {
final long startNanos = System.nanoTime();
final long toFree = minRequiredSpace - usableSpace;
final BlockingQueue<ArchiveInfo> fileQueue = archivedFiles.get(containerName);
ArchiveInfo toDelete;
int deleteCount = 0;
long freed = 0L;
while ((toDelete = fileQueue.poll()) != null) {
while ((toDelete = fileQueue.peek()) != null) {
try {
final long fileSize = toDelete.getSize();
// we use fileQueue.peek above instead of fileQueue.poll() because we don't always want to
// remove the head of the queue. Instead, we want to remove it only if we plan to delete it.
// In order to accomplish this, we just peek at the head and check if it should be deleted.
// If so, then we call poll() to remove it
if (freed < toFree || getLastModTime(toDelete.toPath()) < removalTimeThreshold) {
toDelete = fileQueue.poll(); // remove the head of the queue, which is already stored in 'toDelete'
Files.deleteIfExists(toDelete.toPath());
containerState.decrementArchiveCount();
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", toDelete.getName(), containerName);
freed += fileSize;
deleteCount++;
}
// If we'd freed up enough space, we're done... unless the next file needs to be destroyed based on time.
if (freed >= toFree) {
// check next file's last mod time.
final ArchiveInfo nextFile = fileQueue.peek();
if (nextFile == null) {
// Continue on to queue up the files, in case the next file must be destroyed based on time.
break;
}
// If the last mod time indicates that it should be removed, just continue loop.
final long oldestArchiveDate = getLastModTime(nextFile.toPath());
if (oldestArchiveDate <= removalTimeThreshold) {
if (deleteBasedOnTimestamp(fileQueue, removalTimeThreshold)) {
continue;
}
final ArchiveInfo archiveInfo = fileQueue.peek();
final long oldestArchiveDate = archiveInfo == null ? System.currentTimeMillis() : getLastModTime(archiveInfo.toPath());
// Otherwise, we're done. Return the last mod time of the oldest file in the container's archive.
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (deleteCount > 0) {
LOG.info("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis",
deleteCount, containerName, new Date(oldestArchiveDate), millis);
} else {
LOG.debug("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis",
deleteCount, containerName, new Date(oldestArchiveDate), millis);
}
return oldestArchiveDate;
}
@ -1354,9 +1381,10 @@ public class FileSystemRepository implements ContentRepository {
for (final ResourceClaim claim : toRemove) {
if (archiveData) {
try {
archive(claim);
if (archive(claim)) {
containerState.incrementArchiveCount();
successCount++;
}
} catch (final Exception e) {
LOG.warn("Failed to archive {} due to {}", claim, e.toString());
if (LOG.isDebugEnabled()) {
@ -1376,7 +1404,7 @@ public class FileSystemRepository implements ContentRepository {
if (successCount == 0) {
LOG.debug("No ContentClaims archived/removed for Container {}", container);
} else {
LOG.info("Successfully {} {} Content Claims for Container {} in {} millis", archiveData ? "archived" : "destroyed", successCount, container, millis);
LOG.info("Successfully {} {} Resource Claims for Container {} in {} millis", archiveData ? "archived" : "destroyed", successCount, container, millis);
}
}