NIFI-10023: Ensure that any files that are archived upon startup properly increment the archival count in the content repo

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6037
This commit is contained in:
Mark Payne 2022-05-12 11:10:36 -04:00 committed by Matthew Burgess
parent a528b89169
commit 4a60d1673e
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 78 additions and 3 deletions

View File

@ -504,11 +504,11 @@ public class FileSystemRepository implements ContentRepository {
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false, false); final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false, false);
if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) { if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
removeIncompleteContent(fileToRemove); removeIncompleteContent(fileToRemove, containerName);
} }
} }
private void removeIncompleteContent(final Path fileToRemove) { private void removeIncompleteContent(final Path fileToRemove, final String containerName) {
String fileDescription = null; String fileDescription = null;
try { try {
fileDescription = fileToRemove.toFile().getAbsolutePath() + " (" + Files.size(fileToRemove) + " bytes)"; fileDescription = fileToRemove.toFile().getAbsolutePath() + " (" + Files.size(fileToRemove) + " bytes)";
@ -520,7 +520,16 @@ public class FileSystemRepository implements ContentRepository {
try { try {
if (archiveData) { if (archiveData) {
archive(fileToRemove); final boolean archived = archive(fileToRemove);
if (archived) {
final ContainerState containerState = containerStateMap.get(containerName);
if (containerState == null) {
LOG.warn("Failed to increment container's archive count for {} because container {} could not be found", fileToRemove.toFile(), containerName);
} else {
containerState.incrementArchiveCount();
}
}
} else { } else {
Files.delete(fileToRemove); Files.delete(fileToRemove);
} }
@ -531,6 +540,16 @@ public class FileSystemRepository implements ContentRepository {
} }
} }
// Visible for testing
long getArchiveCount(String containerName) {
final ContainerState containerState = containerStateMap.get(containerName);
if (containerState == null) {
throw new IllegalArgumentException("No container exists with name " + containerName);
}
return containerState.getArchiveCount();
}
@Override @Override
public boolean isActiveResourceClaimsSupported() { public boolean isActiveResourceClaimsSupported() {
return true; return true;
@ -1769,6 +1788,10 @@ public class FileSystemRepository implements ContentRepository {
archivedFileCount.incrementAndGet(); archivedFileCount.incrementAndGet();
} }
public long getArchiveCount() {
return archivedFileCount.get();
}
public void decrementArchiveCount() { public void decrementArchiveCount() {
archivedFileCount.decrementAndGet(); archivedFileCount.decrementAndGet();
} }

View File

@ -55,6 +55,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -140,6 +141,55 @@ public class TestFileSystemRepository {
assertTrue(repository.isArchived(Paths.get("a/b/c/archive/1.txt"))); assertTrue(repository.isArchived(Paths.get("a/b/c/archive/1.txt")));
} }
@Test
public void testUnreferencedFilesAreArchivedOnCleanup() throws IOException {
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();
assertTrue(containerPaths.size() > 0);
for (final Map.Entry<String, Path> entry : containerPaths.entrySet()) {
final String containerName = entry.getKey();
final Path containerPath = entry.getValue();
final Path section1 = containerPath.resolve("1");
final Path file1 = section1.resolve("file-1");
Files.write(file1, "hello".getBytes(), StandardOpenOption.CREATE);
// Should be nothing in the archive at this point
assertEquals(0, repository.getArchiveCount(containerName));
// When we cleanup, we should see one file moved to archive
repository.cleanup();
assertEquals(1, repository.getArchiveCount(containerName));
}
}
@Test
public void testAlreadyArchivedFilesCounted() throws IOException {
// We want to make sure that the initialization code counts files in archive, so we need to create a new FileSystemRepository to do this.
repository.shutdown();
final Map<String, Path> containerPaths = nifiProperties.getContentRepositoryPaths();
assertTrue(containerPaths.size() > 0);
for (final Path containerPath : containerPaths.values()) {
final Path section1 = containerPath.resolve("1");
final Path archive = section1.resolve("archive");
Files.createDirectories(archive);
for (int i=0; i < 3; i++) {
final Path file1 = archive.resolve("file-" + i);
Files.write(file1, "hello".getBytes(), StandardOpenOption.CREATE);
}
}
repository = new FileSystemRepository(nifiProperties);
for (final String containerName : containerPaths.keySet()) {
assertEquals(3, repository.getArchiveCount(containerName));
}
}
@Test @Test
public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException { public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException {
final File contentFile = new File("target/content_repository/0/0.bin"); final File contentFile = new File("target/content_repository/0/0.bin");

View File

@ -48,6 +48,8 @@ nifi.swap.out.threads=4
nifi.content.claim.max.appendable.size=1 MB nifi.content.claim.max.appendable.size=1 MB
nifi.content.claim.max.flow.files=100 nifi.content.claim.max.flow.files=100
nifi.content.repository.directory.default=./target/content_repository nifi.content.repository.directory.default=./target/content_repository
nifi.content.repository.archive.enabled=true
nifi.content.repository.archive.max.usage.percentage=90%
# Provenance Repository Properties # Provenance Repository Properties
nifi.provenance.repository.storage.directory=./target/provenance_repository nifi.provenance.repository.storage.directory=./target/provenance_repository