mirror of
https://github.com/apache/nifi.git
synced 2025-02-28 06:29:25 +00:00
NIFI-450: Catch Throwable from all implementations of Runnable in the FileSystemRepository; these are expected to always be running, so if anything odd like an OutOfMemoryError occurs, this needs to be caught rather than allowing the thread to die
This commit is contained in:
parent
d6408046bc
commit
54f3476a4c
@ -791,34 +791,38 @@ public class FileSystemRepository implements ContentRepository {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
// Get all of the Destructable Claims and bin them based on their Container. We do this
|
try {
|
||||||
// because the Container generally maps to a physical partition on the disk, so we want a few
|
// Get all of the Destructable Claims and bin them based on their Container. We do this
|
||||||
// different threads hitting the different partitions but don't want multiple threads hitting
|
// because the Container generally maps to a physical partition on the disk, so we want a few
|
||||||
// the same partition.
|
// different threads hitting the different partitions but don't want multiple threads hitting
|
||||||
final List<ContentClaim> toDestroy = new ArrayList<>();
|
// the same partition.
|
||||||
while (true) {
|
final List<ContentClaim> toDestroy = new ArrayList<>();
|
||||||
toDestroy.clear();
|
while (true) {
|
||||||
contentClaimManager.drainDestructableClaims(toDestroy, 10000);
|
toDestroy.clear();
|
||||||
if (toDestroy.isEmpty()) {
|
contentClaimManager.drainDestructableClaims(toDestroy, 10000);
|
||||||
return;
|
if (toDestroy.isEmpty()) {
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
for (final ContentClaim claim : toDestroy) {
|
|
||||||
final String container = claim.getContainer();
|
for (final ContentClaim claim : toDestroy) {
|
||||||
final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container);
|
final String container = claim.getContainer();
|
||||||
|
final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container);
|
||||||
try {
|
|
||||||
while (true) {
|
try {
|
||||||
if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) {
|
while (true) {
|
||||||
break;
|
if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) {
|
||||||
} else {
|
break;
|
||||||
LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim);
|
} else {
|
||||||
|
LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
LOG.warn("Failed to clean up {} because thread was interrupted", claim);
|
||||||
}
|
}
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
LOG.warn("Failed to clean up {} because thread was interrupted", claim);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
LOG.error("Failed to cleanup content claims due to {}", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1198,23 +1202,23 @@ public class FileSystemRepository implements ContentRepository {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) {
|
try {
|
||||||
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
|
if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) {
|
||||||
if (minRequiredSpace == null) {
|
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
|
||||||
return;
|
if (minRequiredSpace == null) {
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
final long usableSpace = getContainerUsableSpace(containerName);
|
|
||||||
if (usableSpace > minRequiredSpace) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
|
||||||
LOG.error("Failed to determine space available in container {}; will attempt to cleanup archive", containerName);
|
try {
|
||||||
|
final long usableSpace = getContainerUsableSpace(containerName);
|
||||||
|
if (usableSpace > minRequiredSpace) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
LOG.error("Failed to determine space available in container {}; will attempt to cleanup archive", containerName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Thread.currentThread().setName("Cleanup Archive for " + containerName);
|
Thread.currentThread().setName("Cleanup Archive for " + containerName);
|
||||||
final long oldestContainerArchive;
|
final long oldestContainerArchive;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user