HBASE-20556 Backport HBASE-16490 to branch-1

Backport by: taklonwu@gmail.com

Signed-off-by: Zach York <zyork@apache.org>
This commit is contained in:
Zach York 2018-06-08 11:39:05 -07:00 committed by Zach York
parent 1f3957dda8
commit bed3b2bbae
13 changed files with 116 additions and 36 deletions

View File

@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -1240,8 +1241,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
//start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Map<String, Object> params = new HashMap<String, Object>();
params.put(MASTER, this);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir);
.getFileSystem(), archiveDir, params);
getChoreService().scheduleChore(hfileCleaner);
serviceStarted = true;
if (LOG.isTraceEnabled()) {

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.BaseConfigurable;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import java.util.Map;
/**
* Base class for file cleaners which allows subclasses to implement a simple
* isFileDeletable method (which used to be the FileCleanerDelegate contract).
@ -39,6 +41,11 @@ implements FileCleanerDelegate {
}});
}
@Override
public void init(Map<String, Object> params) {
// subclass could override it if needed.
}
/**
* Should the master delete the file or keep it?
* @param fStat file status of the file to check

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
@ -144,6 +145,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private final Path oldFileDir;
private final Configuration conf;
protected List<T> cleanersChain;
protected Map<String, Object> params;
private AtomicBoolean enabled = new AtomicBoolean(true);
public static void initChorePool(Configuration conf) {
@ -152,6 +154,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
}
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
}
/**
* @param name name of the chore being run
* @param sleepPeriod the period of time to sleep between each run
@ -160,9 +168,10 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
* @param fs handle to the FS
* @param oldFileDir the path to the archived files
* @param confKey configuration key for the classes to instantiate
* @param params members could be used in cleaner
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
super(name, s, sleepPeriod);
Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
@ -170,7 +179,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
this.fs = fs;
this.oldFileDir = oldFileDir;
this.conf = conf;
this.params = params;
initCleanerChain(confKey);
}
@ -249,6 +258,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@SuppressWarnings("unchecked")
T cleaner = (T) c.getDeclaredConstructor().newInstance();
cleaner.setConf(conf);
cleaner.init(this.params);
return cleaner;
} catch (Exception e) {
LOG.warn("Can NOT create CleanerDelegate: " + className, e);

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.util.Map;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
@ -36,4 +38,10 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
* @return files that are ok to delete according to this cleaner
*/
Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files);
/**
* this method is used to pass some instance into subclass
* */
void init(Map<String, Object> params);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.cleaner;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -35,16 +36,23 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory) {
this(period, stopper, conf, fs, directory, null);
}
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param directory directory to be cleaned
* @param params params could be used in subclass of BaseHFileCleanerDelegate
*/
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory) {
super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS);
Path directory, Map<String, Object> params) {
super("HFileCleaner", period, stopper, conf, fs,
directory, MASTER_HFILE_CLEANER_PLUGINS, params);
}
@Override

View File

@ -55,8 +55,8 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler {
* @param masterServices master services provider
*/
public DisabledTableSnapshotHandler(SnapshotDescription snapshot,
final MasterServices masterServices) {
super(snapshot, masterServices);
final MasterServices masterServices, final SnapshotManager snapshotManager) {
super(snapshot, masterServices, snapshotManager);
}
@Override

View File

@ -51,7 +51,7 @@ public class EnabledTableSnapshotHandler extends TakeSnapshotHandler {
public EnabledTableSnapshotHandler(SnapshotDescription snapshot, MasterServices master,
final SnapshotManager manager) {
super(snapshot, master);
super(snapshot, master, manager);
this.coordinator = manager.getCoordinator();
}

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -177,7 +178,8 @@ public class SnapshotFileCache implements Stoppable {
// XXX this is inefficient to synchronize on the method, when what we really need to guard against
// is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
// cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files)
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
final SnapshotManager snapshotManager)
throws IOException {
List<FileStatus> unReferencedFiles = Lists.newArrayList();
List<String> snapshotsInProgress = null;
@ -192,7 +194,7 @@ public class SnapshotFileCache implements Stoppable {
continue;
}
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
snapshotsInProgress = getSnapshotsInProgress(snapshotManager);
}
if (snapshotsInProgress.contains(fileName)) {
continue;
@ -266,8 +268,9 @@ public class SnapshotFileCache implements Stoppable {
this.snapshots.clear();
this.snapshots.putAll(known);
}
@VisibleForTesting List<String> getSnapshotsInProgress() throws IOException {
@VisibleForTesting List<String> getSnapshotsInProgress(
final SnapshotManager snapshotManager) throws IOException {
List<String> snapshotInProgress = Lists.newArrayList();
// only add those files to the cache, but not to the known snapshots
Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
@ -275,20 +278,25 @@ public class SnapshotFileCache implements Stoppable {
FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir);
if (running != null) {
for (FileStatus run : running) {
ReentrantLock lock = null;
if (snapshotManager != null) {
lock = snapshotManager.getLocks().acquireLock(run.getPath().getName());
}
try {
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath()));
} catch (CorruptedSnapshotException e) {
// See HBASE-16464
if (e.getCause() instanceof FileNotFoundException) {
// If the snapshot is not in progress, we will delete it
if (!fs.exists(new Path(run.getPath(),
SnapshotDescriptionUtils.SNAPSHOT_IN_PROGRESS))) {
fs.delete(run.getPath(), true);
LOG.warn("delete the " + run.getPath() + " due to exception:", e.getCause());
}
// If the snapshot is corrupt, we will delete it
fs.delete(run.getPath(), true);
LOG.warn("delete the " + run.getPath() + " due to exception:", e.getCause());
} else {
throw e;
}
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -30,6 +31,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
@ -57,10 +60,12 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
/** File cache for HFiles in the completed and currently running snapshots */
private SnapshotFileCache cache;
private MasterServices master;
@Override
public synchronized Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
try {
return cache.getUnreferencedFiles(files);
return cache.getUnreferencedFiles(files, master.getSnapshotManager());
} catch (CorruptedSnapshotException cse) {
LOG.debug("Corrupted in-progress snapshot file exception, ignored ", cse);
} catch (IOException e) {
@ -69,6 +74,13 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
return Collections.emptyList();
}
@Override
public void init(Map<String, Object> params) {
if (params.containsKey(HMaster.MASTER)) {
this.master = (MasterServices) params.get(HMaster.MASTER);
}
}
@Override
protected boolean isFileDeletable(FileStatus fStat) {
return false;
@ -93,6 +105,7 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
}
}
@Override
public void stop(String why) {
this.cache.stop(why);

View File

@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.zookeeper.KeeperException;
/**
@ -155,6 +156,16 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
private Path rootDir;
private ExecutorService executorService;
/**
* Locks for snapshot operations
* key is snapshot's filename in progress, value is the related lock
* - create snapshot
* - SnapshotCleaner
* */
private KeyLocker<String> locks = new KeyLocker<String>();
public SnapshotManager() {}
/**
@ -466,7 +477,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
// Take the snapshot of the disabled table
DisabledTableSnapshotHandler handler =
new DisabledTableSnapshotHandler(snapshot, master);
new DisabledTableSnapshotHandler(snapshot, master, this);
snapshotTable(snapshot, handler);
}
@ -1177,4 +1188,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
builder.setType(SnapshotDescription.Type.FLUSH);
return builder.build();
}
public KeyLocker<String> getLocks() {
return locks;
}
}

View File

@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -87,6 +88,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
protected final MonitoredTask status;
protected final TableName snapshotTable;
protected final SnapshotManifest snapshotManifest;
protected final SnapshotManager snapshotManager;
protected HTableDescriptor htd;
@ -94,13 +96,15 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
* @param snapshot descriptor of the snapshot to take
* @param masterServices master services provider
*/
public TakeSnapshotHandler(SnapshotDescription snapshot, final MasterServices masterServices) {
public TakeSnapshotHandler(SnapshotDescription snapshot, final MasterServices masterServices,
final SnapshotManager snapshotManager) {
super(masterServices, EventType.C_M_SNAPSHOT_TABLE);
assert snapshot != null : "SnapshotDescription must not be nul1";
assert masterServices != null : "MasterServices must not be nul1";
this.master = masterServices;
this.snapshot = snapshot;
this.snapshotManager = snapshotManager;
this.snapshotTable = TableName.valueOf(snapshot.getTable());
this.conf = this.master.getConfiguration();
this.fs = this.master.getMasterFileSystem().getFileSystem();
@ -161,11 +165,12 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
String msg = "Running " + snapshot.getType() + " table snapshot " + snapshot.getName() + " "
+ eventType + " on table " + snapshotTable;
LOG.info(msg);
ReentrantLock lock = snapshotManager.getLocks().acquireLock(snapshot.getName());
status.setStatus(msg);
try {
// If regions move after this meta scan, the region specific snapshot should fail, triggering
// an external exception that gets captured here.
SnapshotDescriptionUtils.createInProgressTag(workingDir, fs);
// write down the snapshot info in the working directory
SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, fs);
snapshotManifest.addTableDescriptor(this.htd);
@ -229,6 +234,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
} catch (IOException e) {
LOG.error("Couldn't delete snapshot working directory:" + workingDir);
}
lock.unlock();
releaseTableLock();
}
}

View File

@ -138,8 +138,9 @@ public class TestSnapshotFileCache {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles()) {
@Override
List<String> getSnapshotsInProgress() throws IOException {
List<String> result = super.getSnapshotsInProgress();
List<String> getSnapshotsInProgress(final SnapshotManager snapshotManager)
throws IOException {
List<String> result = super.getSnapshotsInProgress(snapshotManager);
count.incrementAndGet();
return result;
}
@ -157,7 +158,7 @@ public class TestSnapshotFileCache {
FSUtils.logFileSystemState(fs, rootDir, LOG);
List<FileStatus> allStoreFiles = getStoreFilesForSnapshot(complete);
Iterable<FileStatus> deletableFiles = cache.getUnreferencedFiles(allStoreFiles);
Iterable<FileStatus> deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertTrue(Iterables.isEmpty(deletableFiles));
// no need for tmp dir check as all files are accounted for.
assertEquals(0, count.get() - countBeforeCheck);
@ -166,7 +167,7 @@ public class TestSnapshotFileCache {
// add a random file to make sure we refresh
FileStatus randomFile = mockStoreFile(UUID.randomUUID().toString());
allStoreFiles.add(randomFile);
deletableFiles = cache.getUnreferencedFiles(allStoreFiles);
deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertEquals(randomFile, Iterables.getOnlyElement(deletableFiles));
assertEquals(1, count.get() - countBeforeCheck); // we check the tmp directory
}
@ -278,7 +279,7 @@ public class TestSnapshotFileCache {
private static Iterable<FileStatus> getNonSnapshotFiles(SnapshotFileCache cache, Path storeFile)
throws IOException {
return cache.getUnreferencedFiles(
Arrays.asList(FSUtils.listStatus(fs, storeFile.getParent()))
Arrays.asList(FSUtils.listStatus(fs, storeFile.getParent())), null
);
}
}

View File

@ -132,7 +132,7 @@ public class TestSnapshotHFileCleaner {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
cache.getSnapshotsInProgress(null);
} catch (CorruptedSnapshotException cse) {
LOG.info("Expected exception " + cse);
} finally {
@ -159,7 +159,7 @@ public class TestSnapshotHFileCleaner {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
cache.getSnapshotsInProgress(null);
} catch (CorruptedSnapshotException cse) {
LOG.info("Expected exception " + cse);
} finally {
@ -167,23 +167,23 @@ public class TestSnapshotHFileCleaner {
}
}
/**
* HBASE-16464
*/
* HBASE-16464
*/
@Test
public void testMissedTmpSnapshot() throws IOException {
SnapshotTestingUtils.SnapshotMock
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir);
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs,
rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2();
builder.missOneRegionSnapshotFile();
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
cache.getSnapshotsInProgress();
"test-snapshot-file-cache-refresh", new SnapshotFiles());
cache.getSnapshotsInProgress(null);
assertFalse(fs.exists(builder.getSnapshotsDir()));
}
}