From 0e7e511aec1ad2c4e602cf1ef789c52edf2cac7e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 12 Jul 2024 15:58:32 -0400 Subject: [PATCH] NIFI-13546 Fixed Content Repository recovery when archive directories are missing Ensure on startup that Content Repo archive directories are created, if configured to do so, rather than creating them on demand. If IOException thrown when archiving file, delete it instead. Cleaned up some code duplication between remove(ResourceClaim) and archive(ResourceClaim) methods This closes #9079 Signed-off-by: David Handermann --- .../repository/FileSystemRepository.java | 159 ++++++++---------- .../repository/TestFileSystemRepository.java | 37 ++-- 2 files changed, 88 insertions(+), 108 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index e7077f6584..16052beea2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -16,6 +16,27 @@ */ package org.apache.nifi.controller.repository; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.io.ContentClaimOutputStream; +import org.apache.nifi.controller.repository.io.LimitedInputStream; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.ByteCountingOutputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.file.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.EOFException; @@ -28,7 +49,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.FileVisitResult; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardOpenOption; @@ -57,26 +77,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.controller.repository.io.ContentClaimOutputStream; -import org.apache.nifi.controller.repository.io.LimitedInputStream; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.stream.io.ByteCountingOutputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.StopWatch; -import org.apache.nifi.util.file.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Is thread safe @@ -145,10 +145,10 @@ public class FileSystemRepository implements ContentRepository { final long appendableClaimLengthCap = DataUnit.parseDataSize(APPENDABLE_CLAIM_LENGTH_CAP, DataUnit.B).longValue(); if (configuredAppendableClaimLength > appendableClaimLengthCap) { LOG.warn("Configured property '{}' with value {} exceeds cap of {}. Setting value to {}", - NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE, - configuredAppendableClaimLength, - APPENDABLE_CLAIM_LENGTH_CAP, - APPENDABLE_CLAIM_LENGTH_CAP); + NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE, + configuredAppendableClaimLength, + APPENDABLE_CLAIM_LENGTH_CAP, + APPENDABLE_CLAIM_LENGTH_CAP); this.maxAppendableClaimLength = appendableClaimLengthCap; } else { this.maxAppendableClaimLength = configuredAppendableClaimLength; @@ -173,7 +173,7 @@ public class FileSystemRepository implements ContentRepository { if (maxArchiveSize == null) { throw new RuntimeException("No value specified for property '" - + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving."); + + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving."); } if (!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) { @@ -210,7 +210,7 @@ public class FileSystemRepository implements ContentRepository { final long maxArchiveBytes = (long) (capacity * (1D - (maxArchiveRatio - 0.02))); minUsableContainerBytesForArchive.put(container.getKey(), maxArchiveBytes); LOG.info("Maximum Threshold for Container {} set to {} bytes; if volume exceeds this size, archived data will be deleted until it no longer exceeds this size", - containerName, maxArchiveBytes); + containerName, maxArchiveBytes); final long backPressureBytes = (long) (container.getValue().toFile().getTotalSpace() * archiveBackPressureRatio); final ContainerState containerState = new ContainerState(containerName, true, backPressureBytes, capacity); @@ -304,8 +304,11 @@ public class FileSystemRepository implements ContentRepository { realPath = Files.createDirectories(containerPath).toRealPath(); } + // Ensure that the directory exists for the section, including the archive directory, if configured to archive data. for (int i = 0; i < SECTIONS_PER_CONTAINER; i++) { - Files.createDirectories(realPath.resolve(String.valueOf(i))); + final Path sectionPath = realPath.resolve(String.valueOf(i)); + final Path toCreate = archiveData ? sectionPath.resolve(ARCHIVE_DIR_NAME) : sectionPath; + Files.createDirectories(toCreate); } realPathMap.put(containerName, realPath); @@ -727,33 +730,11 @@ public class FileSystemRepository implements ContentRepository { } private boolean remove(final ResourceClaim claim) { - if (claim == null) { + if (!cleanupResources(claim)) { return false; } - // If the claim is still in use, we won't remove it. - if (claim.isInUse()) { - return false; - } - - Path path = null; - try { - path = getPath(claim); - } catch (final ContentNotFoundException ignored) { - } - - // Ensure that we have no writable claim streams for this resource claim - final ByteCountingOutputStream bcos = writableClaimStreams.remove(claim); - LOG.debug("Removed Stream {} for {} from writableClaimStreams because Resource Claim was removed", bcos, claim); - - if (bcos != null) { - try { - bcos.close(); - } catch (final IOException e) { - LOG.warn("Failed to close Output Stream for {} due to {}", claim, e); - } - } - + final Path path = getPath(claim); if (path != null) { final File file = path.toFile(); if (!file.delete() && file.exists()) { @@ -1131,25 +1112,10 @@ public class FileSystemRepository implements ContentRepository { return false; } - if (claim.isInUse()) { + if (!cleanupResources(claim)) { return false; } - // If the claim count is decremented to 0 (<= 0 as a 'defensive programming' strategy), ensure that - // we close the stream if there is one. There may be a stream open if create() is called and then - // claimant count is removed without writing to the claim (or more specifically, without closing the - // OutputStream that is returned when calling write() ). - final OutputStream out = writableClaimStreams.remove(claim); - LOG.debug("Removed {} for {} from writableClaimStreams because Resource Claim was archived", out, claim); - - if (out != null) { - try { - out.close(); - } catch (final IOException ioe) { - LOG.warn("Unable to close Output Stream for {}", claim, ioe); - } - } - final Path curPath = getPath(claim); if (curPath == null) { return false; @@ -1160,6 +1126,33 @@ public class FileSystemRepository implements ContentRepository { return archived; } + private boolean cleanupResources(final ResourceClaim claim) { + if (claim == null) { + return false; + } + + if (claim.isInUse()) { + return false; + } + + // If the claim count is decremented to 0 (<= 0 as a 'defensive programming' strategy), ensure that + // we close the stream if there is one. There may be a stream open if create() is called and then + // claimant count is removed without writing to the claim (or more specifically, without closing the + // OutputStream that is returned when calling write() ). + final OutputStream out = writableClaimStreams.remove(claim); + LOG.debug("Removed {} for {} from writableClaimStreams because Resource Claim was archived or removed", out, claim); + + if (out != null) { + try { + out.close(); + } catch (final IOException ioe) { + LOG.warn("Unable to close Output Stream for {}", claim, ioe); + } + } + + return true; + } + protected int getOpenStreamCount() { return writableClaimStreams.size(); } @@ -1183,26 +1176,8 @@ public class FileSystemRepository implements ContentRepository { return false; } - try { - Files.move(curPath, archivePath); - return true; - } catch (final NoSuchFileException nsfee) { - // If the current path exists, try to create archive path and do the move again. - // Otherwise, either the content was removed or has already been archived. Either way, - // there's nothing that can be done. - if (Files.exists(curPath)) { - // The archive directory doesn't exist. Create now and try operation again. - // We do it this way, rather than ensuring that the directory exists ahead of time because - // it will be rare for the directory not to exist and we would prefer to have the overhead - // of the Exception being thrown in these cases, rather than have the overhead of checking - // for the existence of the directory continually. - Files.createDirectories(archivePath.getParent()); - Files.move(curPath, archivePath); - return true; - } - - return false; - } + Files.move(curPath, archivePath); + return true; } private long getLastModTime(final File file) { @@ -1466,7 +1441,12 @@ public class FileSystemRepository implements ContentRepository { successCount++; } } catch (final Exception e) { - LOG.warn("Failed to archive {}", claim, e); + final boolean removed = remove(claim); + if (removed) { + LOG.warn("Failed to archive {} but was able to cleanup resources; removed file instead of archiving.", claim); + } else { + LOG.warn("Failed to archive {} and unable to remove file.", claim, e); + } } } else if (remove(claim)) { successCount++; @@ -1929,4 +1909,5 @@ public class FileSystemRepository implements ContentRepository { return scc; } } + } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index a69fddaa44..7fd3c02931 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -16,6 +16,24 @@ */ package org.apache.nifi.controller.repository; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.repository.util.DiskUtils; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.NiFiProperties; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -39,23 +57,6 @@ import java.util.Locale; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.controller.repository.claim.StandardResourceClaim; -import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; -import org.apache.nifi.controller.repository.util.DiskUtils; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.NiFiProperties; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.DisabledOnOs; -import org.junit.jupiter.api.condition.OS; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -179,7 +180,6 @@ public class TestFileSystemRepository { // Perform a few iterations to ensure that it works not just the first time, since there is a lot of logic on initialization. for (int i = 0; i < 3; i++) { final File archiveDir = containerPath.resolve(String.valueOf(i)).resolve("archive").toFile(); - assertTrue(archiveDir.mkdirs()); final File archivedFile = new File(archiveDir, "1234"); try (final OutputStream fos = new FileOutputStream(archivedFile)) { @@ -209,7 +209,6 @@ public class TestFileSystemRepository { // Perform a few iterations to ensure that it works not just the first time, since there is a lot of logic on initialization. for (int i = 0; i < 3; i++) { final File archiveDir = containerPath.resolve(String.valueOf(i)).resolve("archive").toFile(); - assertTrue(archiveDir.mkdirs()); final File archivedFile = new File(archiveDir, "1234"); try (final OutputStream fos = new FileOutputStream(archivedFile)) {