HBASE-27321 The ReplicationLogCleaner is not thread safe but can be called from different threads at the same time (#4730)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2022-08-27 23:41:28 +08:00 committed by GitHub
parent 06728e554c
commit 37651ee1b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 244 additions and 57 deletions

View File

@ -182,13 +182,14 @@ public class ChoreService {
* @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
* yet then this call is equivalent to a call to scheduleChore.
*/
private void rescheduleChore(ScheduledChore chore) {
private void rescheduleChore(ScheduledChore chore, boolean immediately) {
if (scheduledChores.containsKey(chore)) {
ScheduledFuture<?> future = scheduledChores.get(chore);
future.cancel(false);
}
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
chore.getPeriod(), chore.getTimeUnit());
// set initial delay to 0 as we want to run it immediately
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore,
immediately ? 0 : chore.getPeriod(), chore.getPeriod(), chore.getTimeUnit());
scheduledChores.put(chore, future);
}
@ -244,7 +245,7 @@ public class ChoreService {
allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
synchronized void triggerNow(ScheduledChore chore) {
assert chore.getChoreService() == this;
rescheduleChore(chore);
rescheduleChore(chore, true);
}
/** Returns number of chores that this service currently has scheduled */
@ -343,7 +344,7 @@ public class ChoreService {
// the chore is NOT rescheduled, future executions of this chore will be delayed more and
// more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
// idle threads to chores based on how delayed they are.
rescheduleChore(chore);
rescheduleChore(chore, false);
printChoreDetails("onChoreMissedStartTime", chore);
}

View File

@ -29,6 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@ -1632,8 +1634,16 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
public RunCleanerChoreResponse runCleanerChore(RpcController c, RunCleanerChoreRequest req)
throws ServiceException {
rpcPreCheck("runCleanerChore");
boolean result = server.getHFileCleaner().runCleaner() && server.getLogCleaner().runCleaner();
return ResponseConverter.buildRunCleanerChoreResponse(result);
try {
CompletableFuture<Boolean> fileCleanerFuture = server.getHFileCleaner().triggerCleanerNow();
CompletableFuture<Boolean> logCleanerFuture = server.getLogCleaner().triggerCleanerNow();
boolean result = fileCleanerFuture.get() && logCleanerFuture.get();
return ResponseConverter.buildRunCleanerChoreResponse(result);
} catch (InterruptedException e) {
throw new ServiceException(e);
} catch (ExecutionException e) {
throw new ServiceException(e.getCause());
}
}
@Override

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -81,6 +80,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected List<String> excludeDirs;
private CompletableFuture<Boolean> future;
private boolean forceRun;
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
@ -168,10 +169,10 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
* @param confKey key to get the file cleaner classes from the configuration
*/
private void initCleanerChain(String confKey) {
this.cleanersChain = new LinkedList<>();
String[] logCleaners = conf.getStrings(confKey);
if (logCleaners != null) {
for (String className : logCleaners) {
this.cleanersChain = new ArrayList<>();
String[] cleaners = conf.getStrings(confKey);
if (cleaners != null) {
for (String className : cleaners) {
className = className.trim();
if (className.isEmpty()) {
continue;
@ -209,25 +210,61 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
@Override
protected void chore() {
if (getEnabled()) {
try {
pool.latchCountUp();
if (runCleaner()) {
LOG.trace("Cleaned all WALs under {}", oldFileDir);
} else {
LOG.trace("WALs outstanding under {}", oldFileDir);
}
} finally {
pool.latchCountDown();
protected boolean initialChore() {
synchronized (this) {
if (forceRun) {
// wake up the threads waiting in triggerCleanerNow, as a triggerNow may triggers the first
// loop where we will only call initialChore. We need to trigger another run immediately.
forceRun = false;
notifyAll();
}
}
return true;
}
@Override
protected void chore() {
CompletableFuture<Boolean> f;
synchronized (this) {
if (!enabled.get()) {
if (!forceRun) {
LOG.trace("Cleaner chore {} disabled! Not cleaning.", getName());
return;
} else {
LOG.info("Force executing cleaner chore {} when disabled", getName());
}
}
if (future != null) {
LOG.warn("A cleaner chore {}'s run is in progress, give up running", getName());
return;
}
f = new CompletableFuture<>();
future = f;
notifyAll();
}
pool.latchCountUp();
try {
preRunCleaner();
pool.execute(() -> traverseAndDelete(oldFileDir, true, f));
if (f.get()) {
LOG.trace("Cleaned all files under {}", oldFileDir);
} else {
LOG.trace("Files outstanding under {}", oldFileDir);
}
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
} finally {
postRunCleaner();
synchronized (this) {
future = null;
forceRun = false;
}
pool.latchCountDown();
// After each cleaner chore, checks if received reconfigure notification while cleaning.
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
} else {
LOG.trace("Cleaner chore disabled! Not cleaning.");
}
}
@ -235,15 +272,24 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
cleanersChain.forEach(FileCleanerDelegate::preClean);
}
public boolean runCleaner() {
preRunCleaner();
try {
CompletableFuture<Boolean> future = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
return future.get();
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
return false;
private void postRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::postClean);
}
/**
* Trigger the cleaner immediately and return a CompletableFuture for getting the result. Return
* {@code true} means all the old files have been deleted, otherwise {@code false}.
*/
public synchronized CompletableFuture<Boolean> triggerCleanerNow() throws InterruptedException {
for (;;) {
if (future != null) {
return future;
}
forceRun = true;
if (!triggerNow()) {
return CompletableFuture.completedFuture(false);
}
wait();
}
}
@ -396,9 +442,6 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
return pool.getSize();
}
/**
* n
*/
public boolean setEnabled(final boolean enabled) {
return this.enabled.getAndSet(enabled);
}
@ -449,7 +492,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(voidObj, e) -> {
if (e != null) {
result.completeExceptionally(e);
result.completeExceptionally(FutureUtils.unwrapCompletionException(e));
return;
}
try {

View File

@ -49,6 +49,12 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
default void preClean() {
}
/**
* Used to do some cleanup work
*/
default void postClean() {
}
/**
* Check if a empty directory with no subdirs or subfiles can be deleted
* @param dir Path of the directory

View File

@ -66,6 +66,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
}
}
@Override
public void postClean() {
// release memory
wals = null;
}
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,

View File

@ -449,7 +449,7 @@ public class TestTableSnapshotScanner {
// set file modify time and then run cleaner
long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get();
// scan snapshot
try (TableSnapshotScanner scanner =
new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,

View File

@ -17,19 +17,28 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.Stoppable;
@ -38,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -49,6 +59,8 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ MasterTests.class, SmallTests.class })
public class TestCleanerChore {
@ -59,15 +71,17 @@ public class TestCleanerChore {
private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static DirScanPool POOL;
private static ChoreService SERVICE;
@BeforeClass
public static void setup() {
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
SERVICE = new ChoreService("cleaner", 2, true);
}
@AfterClass
public static void cleanup() throws Exception {
// delete and recreate the test directory, ensuring a clean test dir between tests
SERVICE.shutdown();
UTIL.cleanupTestDir();
POOL.shutdownNow();
}
@ -114,7 +128,6 @@ public class TestCleanerChore {
fs.create(file).close();
assertTrue("test file didn't get created.", fs.exists(file));
final AtomicBoolean fails = new AtomicBoolean(true);
FilterFileSystem filtered = new FilterFileSystem(fs) {
public FileStatus[] listStatus(Path f) throws IOException {
if (fails.get()) {
@ -126,25 +139,38 @@ public class TestCleanerChore {
AllValidPaths chore =
new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL);
SERVICE.scheduleChore(chore);
try {
// trouble talking to the filesystem
// and verify that it accurately reported the failure.
CompletableFuture<Boolean> errorFuture = chore.triggerCleanerNow();
ExecutionException e = assertThrows(ExecutionException.class, () -> errorFuture.get());
assertThat(e.getCause(), instanceOf(IOException.class));
assertThat(e.getCause().getMessage(), containsString("whomp"));
// trouble talking to the filesystem
Boolean result = chore.runCleaner();
// verify that it couldn't clean the files.
assertTrue("test rig failed to inject failure.", fs.exists(file));
assertTrue("test rig failed to inject failure.", fs.exists(child));
// verify that it couldn't clean the files.
assertTrue("test rig failed to inject failure.", fs.exists(file));
assertTrue("test rig failed to inject failure.", fs.exists(child));
// and verify that it accurately reported the failure.
assertFalse("chore should report that it failed.", result);
// filesystem is back
fails.set(false);
for (;;) {
CompletableFuture<Boolean> succFuture = chore.triggerCleanerNow();
// the reset of the future is async, so it is possible that we get the previous future
// again.
if (succFuture != errorFuture) {
// verify that it accurately reported success.
assertTrue("chore should claim it succeeded.", succFuture.get());
break;
}
}
// verify everything is gone.
assertFalse("file should have been destroyed.", fs.exists(file));
assertFalse("directory should have been destroyed.", fs.exists(child));
// filesystem is back
fails.set(false);
result = chore.runCleaner();
// verify everything is gone.
assertFalse("file should have been destroyed.", fs.exists(file));
assertFalse("directory should have been destroyed.", fs.exists(child));
// and verify that it accurately reported success.
assertTrue("chore should claim it succeeded.", result);
} finally {
chore.cancel();
}
}
@Test
@ -536,6 +562,57 @@ public class TestCleanerChore {
assertEquals(1, CleanerChore.calculatePoolSize("0.0"));
}
@Test
public void testTriggerCleaner() throws Exception {
Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
fs.mkdirs(testDir);
String confKey = "hbase.test.cleaner.delegates";
conf.set(confKey, AlwaysDelete.class.getName());
final AllValidPaths chore =
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL);
try {
SERVICE.scheduleChore(chore);
assertTrue(chore.triggerCleanerNow().get());
chore.setEnabled(false);
// should still runnable
assertTrue(chore.triggerCleanerNow().get());
} finally {
chore.cancel();
}
}
@Test
public void testRescheduleNoConcurrencyRun() throws Exception {
Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
fs.mkdirs(testDir);
fs.createNewFile(new Path(testDir, "test"));
String confKey = "hbase.test.cleaner.delegates";
conf.set(confKey, GetConcurrency.class.getName());
AtomicInteger maxConcurrency = new AtomicInteger();
final AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir,
confKey, POOL, ImmutableMap.of("maxConcurrency", maxConcurrency));
try {
SERVICE.scheduleChore(chore);
for (int i = 0; i < 100; i++) {
chore.triggerNow();
Thread.sleep(5 + ThreadLocalRandom.current().nextInt(5));
}
Thread.sleep(1000);
// set a barrier here to make sure that the previous runs are also finished
assertFalse(chore.triggerCleanerNow().get());
// make sure we do not have multiple cleaner runs at the same time
assertEquals(1, maxConcurrency.get());
} finally {
chore.cancel();
}
}
private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
for (int i = 0; i < numOfFiles; i++) {
int xMega = 1 + ThreadLocalRandom.current().nextInt(3); // size of each file is between 1~3M
@ -556,6 +633,11 @@ public class TestCleanerChore {
super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool);
}
public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs,
Path oldFileDir, String confkey, DirScanPool pool, Map<String, Object> params) {
super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool, params, null);
}
// all paths are valid
@Override
protected boolean validate(Path file) {
@ -576,4 +658,43 @@ public class TestCleanerChore {
return false;
}
}
public static class GetConcurrency extends BaseHFileCleanerDelegate {
private final AtomicInteger concurrency = new AtomicInteger();
private AtomicInteger maxConcurrency;
@Override
public void init(Map<String, Object> params) {
maxConcurrency = (AtomicInteger) params.get("maxConcurrency");
}
@Override
public void preClean() {
int c = concurrency.incrementAndGet();
while (true) {
int cur = maxConcurrency.get();
if (c <= cur) {
break;
}
if (maxConcurrency.compareAndSet(cur, c)) {
break;
}
}
}
@Override
public void postClean() {
concurrency.decrementAndGet();
}
@Override
protected boolean isFileDeletable(FileStatus fStat) {
// sleep a while to slow down the process
Threads.sleepWithoutInterrupt(10 + ThreadLocalRandom.current().nextInt(10));
return false;
}
}
}