mirror of https://github.com/apache/nifi.git
NIFI-7992: Periodically check disk usage for content repo to see if backpressure should be applied. Log progress in background task. Improve performance of background cleanup task by not using an ArrayList Iterator and constantly calling remove but instead wait until the end of our cleanup loop and then removed from the list all elements that should be removed in a single update
This closes #4652. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
940bc3056c
commit
badcfe1ab7
|
@ -57,7 +57,6 @@ import java.util.Comparator;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -1340,34 +1339,45 @@ public class FileSystemRepository implements ContentRepository {
|
||||||
|
|
||||||
// Delete the oldest data
|
// Delete the oldest data
|
||||||
archiveExpirationLog.debug("Deleting data based on timestamp");
|
archiveExpirationLog.debug("Deleting data based on timestamp");
|
||||||
final Iterator<ArchiveInfo> itr = notYetExceedingThreshold.iterator();
|
int archiveFilesDeleted = 0;
|
||||||
int counter = 0;
|
long archiveBytesDeleted = 0L;
|
||||||
while (itr.hasNext()) {
|
for (final ArchiveInfo archiveInfo : notYetExceedingThreshold) {
|
||||||
final ArchiveInfo archiveInfo = itr.next();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final Path path = archiveInfo.toPath();
|
final Path path = archiveInfo.toPath();
|
||||||
Files.deleteIfExists(path);
|
Files.deleteIfExists(path);
|
||||||
containerState.decrementArchiveCount();
|
containerState.decrementArchiveCount();
|
||||||
|
archiveBytesDeleted += archiveInfo.getSize();
|
||||||
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", archiveInfo.getName(), containerName);
|
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", archiveInfo.getName(), containerName);
|
||||||
|
|
||||||
// Check if we've freed enough space every 25 files that we destroy
|
// Check if we've freed enough space every 25 files that we destroy
|
||||||
if (++counter % 25 == 0) {
|
if (++archiveFilesDeleted % 25 == 0) {
|
||||||
if (getContainerUsableSpace(containerName) > minRequiredSpace) { // check if we can stop now
|
if (getContainerUsableSpace(containerName) > minRequiredSpace) { // check if we can stop now
|
||||||
LOG.debug("Finished cleaning up archive for Container {}", containerName);
|
LOG.debug("Finished cleaning up archive for Container {}", containerName);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If deleting a huge number of files, it can take a while. This may occur when users have a very large number of tiny
|
||||||
|
// FlowFiles and also have the nifi.content.claim.max.appendable.size property set to a low value. In such a case, this
|
||||||
|
// process may block processors from performing their job. As a result, we want to periodically log something to let
|
||||||
|
// users know what is going on, so that the system doesn't appear to just completely freeze up periodically.
|
||||||
|
if (archiveFilesDeleted % 25_000 == 0 && archiveFilesDeleted > 0) {
|
||||||
|
LOG.info("So far in this iteration, successfully deleted {} files ({}) from archive because the Content Repository size was exceeding the max configured size. Will continue " +
|
||||||
|
"deleting files from the archive until the usage drops below the threshold or until all {} archived files have been removed",
|
||||||
|
archiveFilesDeleted, FormatUtils.formatDataSize(archiveBytesDeleted), notYetExceedingThreshold.size());
|
||||||
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
LOG.warn("Failed to delete {} from archive due to {}", archiveInfo, ioe.toString());
|
LOG.warn("Failed to delete {} from archive due to {}", archiveInfo, ioe.toString());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.warn("", ioe);
|
LOG.warn("", ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
itr.remove();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove the first 'counter' elements from the list because those were removed.
|
||||||
|
notYetExceedingThreshold.subList(0, archiveFilesDeleted).clear();
|
||||||
|
LOG.info("Successfully deleted {} files ({}) from archive", archiveFilesDeleted, FormatUtils.formatDataSize(archiveBytesDeleted));
|
||||||
|
|
||||||
final long deleteOldestMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - sortRemainingMillis - deleteExpiredMillis;
|
final long deleteOldestMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - sortRemainingMillis - deleteExpiredMillis;
|
||||||
|
|
||||||
long oldestContainerArchive;
|
long oldestContainerArchive;
|
||||||
|
@ -1568,6 +1578,7 @@ public class FileSystemRepository implements ContentRepository {
|
||||||
private final Condition condition = lock.newCondition();
|
private final Condition condition = lock.newCondition();
|
||||||
|
|
||||||
private volatile long bytesUsed = 0L;
|
private volatile long bytesUsed = 0L;
|
||||||
|
private volatile long checkUsedCutoffTimestamp = 0L;
|
||||||
|
|
||||||
public ContainerState(final String containerName, final boolean archiveEnabled, final long backPressureBytes, final long capacity) {
|
public ContainerState(final String containerName, final boolean archiveEnabled, final long backPressureBytes, final long capacity) {
|
||||||
this.containerName = containerName;
|
this.containerName = containerName;
|
||||||
|
@ -1586,19 +1597,38 @@ public class FileSystemRepository implements ContentRepository {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (archivedFileCount.get() == 0) {
|
||||||
|
LOG.debug("Waiting to write to container {} is not required because archivedFileCount is 0", containerName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
long used = bytesUsed;
|
long used = bytesUsed;
|
||||||
|
|
||||||
if (used == 0L) {
|
// We want to calculate the amount of free space & amount of used space if either it's not yet been calculated
|
||||||
|
// (used == 0) or if it's been at least 1 minute since the space was last checked.
|
||||||
|
final boolean calculateUsed = (used == 0L) || System.currentTimeMillis() > checkUsedCutoffTimestamp;
|
||||||
|
if (calculateUsed) {
|
||||||
try {
|
try {
|
||||||
final long free = getContainerUsableSpace(containerName);
|
final long free = getContainerUsableSpace(containerName);
|
||||||
used = capacity - free;
|
used = capacity - free;
|
||||||
bytesUsed = used;
|
bytesUsed = used;
|
||||||
|
|
||||||
|
// Make sure that we check the amount of usable space again in 1 minute.
|
||||||
|
// This is important because 'bytesUsed' is set in two places:
|
||||||
|
// here, and in the DestroyExpiredArchives task. However, if there are a huge number
|
||||||
|
// of archived files, it can take longer to destroy expired archives than to create & archive data.
|
||||||
|
// As a result, we can have a race condition where that doesn't finish quickly enough and as a result
|
||||||
|
// this doesn't get updated, so the amount of data archived just grows and grows, eventually leading
|
||||||
|
// to running out of disk space.
|
||||||
|
checkUsedCutoffTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1L);
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
|
checkUsedCutoffTimestamp = 0L;
|
||||||
|
LOG.warn("Failed to determine how much disk space is available for container {}", containerName, e);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return used >= backPressureBytes && archivedFileCount.get() > 0;
|
return used >= backPressureBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void waitForArchiveExpiration() {
|
public void waitForArchiveExpiration() {
|
||||||
|
@ -1630,12 +1660,15 @@ public class FileSystemRepository implements ContentRepository {
|
||||||
try {
|
try {
|
||||||
final long free = getContainerUsableSpace(containerName);
|
final long free = getContainerUsableSpace(containerName);
|
||||||
bytesUsed = capacity - free;
|
bytesUsed = capacity - free;
|
||||||
|
checkUsedCutoffTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1L);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
|
LOG.warn("Failed to determine how much disk space is available for container {}", containerName, e);
|
||||||
bytesUsed = 0L;
|
bytesUsed = 0L;
|
||||||
|
checkUsedCutoffTimestamp = 0L; // Signal that the free space should be calculated again next time it's checked.
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Container {} signaled to allow Content Claim Creation", containerName);
|
LOG.debug("Container {} signaled to allow Content Claim Creation", containerName);
|
||||||
condition.signal();
|
condition.signalAll();
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue