HBASE-27043 Let lock wait timeout to improve performance of SnapshotHFileCleaner (#4437)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Xiaolin Ha 2022-05-21 06:32:58 +08:00 committed by GitHub
parent bf5f0c7e7f
commit 1aa07d5e6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 39 additions and 28 deletions

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
@ -97,6 +98,8 @@ public class SnapshotFileCache implements Stoppable {
private ImmutableMap<String, SnapshotDirectoryInfo> snapshots = ImmutableMap.of();
private final Timer refreshTimer;
private static final int LOCK_TIMEOUT_MS = 30000;
/**
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
* filesystem.
@ -192,39 +195,47 @@ public class SnapshotFileCache implements Stoppable {
if (snapshotManager != null) {
lock = snapshotManager.getTakingSnapshotLock().writeLock();
}
if (lock == null || lock.tryLock()) {
try {
if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
LOG.warn("Not checking unreferenced files since snapshot is running, it will "
+ "skip to clean the HFiles this time");
return unReferencedFiles;
}
ImmutableSet<String> currentCache = cache;
for (FileStatus file : files) {
String fileName = file.getPath().getName();
if (!refreshed && !currentCache.contains(fileName)) {
synchronized (this) {
refreshCache();
currentCache = cache;
refreshed = true;
try {
if (lock == null || lock.tryLock(LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
try {
if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
LOG.warn("Not checking unreferenced files since snapshot is running, it will "
+ "skip to clean the HFiles this time");
return unReferencedFiles;
}
ImmutableSet<String> currentCache = cache;
for (FileStatus file : files) {
String fileName = file.getPath().getName();
if (!refreshed && !currentCache.contains(fileName)) {
synchronized (this) {
refreshCache();
currentCache = cache;
refreshed = true;
}
}
if (currentCache.contains(fileName)) {
continue;
}
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
}
if (snapshotsInProgress.contains(fileName)) {
continue;
}
unReferencedFiles.add(file);
}
if (currentCache.contains(fileName)) {
continue;
} finally {
if (lock != null) {
lock.unlock();
}
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
}
if (snapshotsInProgress.contains(fileName)) {
continue;
}
unReferencedFiles.add(file);
}
} finally {
if (lock != null) {
lock.unlock();
}
} else {
LOG.warn("Failed to acquire write lock on taking snapshot after waiting {}ms",
LOCK_TIMEOUT_MS);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while acquiring write lock on taking snapshot");
Thread.currentThread().interrupt(); // restore the interrupt flag
}
return unReferencedFiles;
}