HBASE-20734 Colocate recovered edits directory with hbase.wal.dir

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
Zach York 2018-06-27 16:18:53 -07:00 committed by Reid Chan
parent c6a1334528
commit a00adb0b45
8 changed files with 274 additions and 199 deletions

View File

@ -58,6 +58,7 @@ import java.util.NavigableSet;
import java.util.RandomAccess;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@ -326,6 +327,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
private Path regionDir;
private FileSystem walFS;
// The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that
@ -921,7 +925,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
// Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId);
}
@ -965,8 +969,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// is opened before recovery completes. So we add a safety bumper to avoid new sequence number
// overlaps used sequence numbers
if (this.writestate.writesEnabled) {
nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
.getRegionDir(), nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1));
nextSeqid = WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1));
} else {
nextSeqid++;
}
@ -1104,11 +1108,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
// Store SeqId in HDFS when a region closes
// Store SeqId in WAL FileSystem when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
if (getWalFileSystem().exists(getWALRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
mvcc.getReadPoint(), 0);
}
}
@ -1797,6 +1801,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return this.fs;
}
/** @return the WAL {@link HRegionFileSystem} used by this region */
HRegionFileSystem getRegionWALFileSystem() throws IOException {
return new HRegionFileSystem(conf, getWalFileSystem(),
FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
}
/** @return WAL {@link FileSystem} being used by this region */
FileSystem getWalFileSystem() throws IOException {
if (walFS == null) {
walFS = FSUtils.getWALFileSystem(conf);
}
return walFS;
}
/**
* @return the Region Directory under the WALRootDir
* @throws IOException if there is an error getting WALRootDir
*/
@VisibleForTesting
public Path getWALRegionDir() throws IOException {
if (regionDir == null) {
regionDir = FSUtils.getWALRegionDir(conf, fs.getRegionInfo());
}
return regionDir;
}
@Override
public long getEarliestFlushTimeForAllStores() {
return Collections.min(lastStoreFlushTimeMap.values());
@ -4247,8 +4277,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
Map<byte[], Long> maxSeqIdInStores,
protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status)
throws IOException {
long minSeqIdForTheRegion = -1;
@ -4259,65 +4288,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
long seqid = minSeqIdForTheRegion;
FileSystem fs = this.fs.getFileSystem();
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
FileSystem walFS = getWalFileSystem();
Path regionDir = getWALRegionDir();
FileSystem rootFS = getFilesystem();
Path defaultRegionDir = getRegionDir(FSUtils.getRootDir(conf), getRegionInfo());
// This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
// under the root dir even if walDir is set.
NavigableSet<Path> filesUnderRootDir = null;
if (!regionDir.equals(defaultRegionDir)) {
filesUnderRootDir =
WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir);
seqid = Math.max(seqid,
replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter,
defaultRegionDir));
}
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
files, reporter, regionDir));
if (files == null || files.isEmpty()) return seqid;
for (Path edits: files) {
if (edits == null || !fs.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
continue;
}
if (isZeroLengthThenDelete(fs, edits)) continue;
long maxSeqId;
String fileName = edits.getName();
maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
String msg = "Maximum sequenceid for this wal is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
}
continue;
}
try {
// replay the edits. Replay can return -1 if everything is skipped, only update
// if seqId is greater
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
} catch (IOException e) {
boolean skipErrors = conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
conf.getBoolean(
"hbase.skip.errors",
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
if (conf.get("hbase.skip.errors") != null) {
LOG.warn(
"The property 'hbase.skip.errors' has been deprecated. Please use " +
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
} else {
throw e;
}
}
}
// The edits size added into rsAccounting during this replaying will not
// be required any more. So just clear it.
if (this.rsAccounting != null) {
this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
}
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false);
@ -4327,16 +4316,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
Set<StoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
null, null));
fakeStoreFiles.add(
new StoreFile(walFS, file, this.conf, null, null));
}
getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
if (filesUnderRootDir != null) {
for (Path file : filesUnderRootDir) {
if (!rootFS.delete(file, false)) {
LOG.error("Failed delete of {} under root directory." + file);
} else {
LOG.debug("Deleted recovered.edits root directory file=" + file);
}
}
}
for (Path file: files) {
if (!fs.delete(file, false)) {
if (!walFS.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
@ -4346,22 +4344,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return seqid;
}
/*
private long replayRecoveredEditsForPaths(long minSeqIdForTheRegion, FileSystem fs,
final NavigableSet<Path> files, final CancelableProgressable reporter, final Path regionDir)
throws IOException {
long seqid = minSeqIdForTheRegion;
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size()) +
" recovered edits file(s) under " + regionDir);
}
if (files == null || files.isEmpty()) {
return seqid;
}
for (Path edits : files) {
if (edits == null || !walFS.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
continue;
}
if (isZeroLengthThenDelete(walFS, edits)) {
continue;
}
long maxSeqId;
String fileName = edits.getName();
maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
String msg = "Maximum sequenceid for this wal is " + maxSeqId +
" and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
}
continue;
}
try {
// replay the edits. Replay can return -1 if everything is skipped, only update
// if seqId is greater
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter, fs));
} catch (IOException e) {
boolean skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
conf.getBoolean("hbase.skip.errors",
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
if (conf.get("hbase.skip.errors") != null) {
LOG.warn("The property 'hbase.skip.errors' has been deprecated. Please use " +
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS +
"=true so continuing. Renamed " + edits + " as " + p, e);
} else {
throw e;
}
}
}
return seqid;
}
/**
* @param edits File of recovered edits.
* @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal
* must be larger than this to be replayed for each store.
* @param reporter
* must be larger than this to be replayed for each store.
* @param reporter CacelableProgressable reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final FileSystem fs)
throws IOException {
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
FileSystem fs = this.fs.getFileSystem();
status.setStatus("Opening recovered edits");
WAL.Reader reader = null;
@ -8316,7 +8372,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
48 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(15 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);

View File

@ -1163,6 +1163,20 @@ public abstract class FSUtils {
return true;
}
/**
* Returns the WAL region directory based on the region info
* @param conf configuration to determine WALRootDir
* @param regionInfo used to get region and table
* @return the region directory used to store WALs under the WALRootDir
* @throws IOException if there is an exception determining the WALRootDir
*/
public static Path getWALRegionDir(final Configuration conf,
final HRegionInfo regionInfo)
throws IOException {
return new Path(getWALTableDir(conf, regionInfo.getTable()),
regionInfo.getEncodedName());
}
/**
* Checks if meta region exists
*
@ -1313,6 +1327,19 @@ public abstract class FSUtils {
tableName.getQualifierAsString());
}
/**
* Returns the Table directory under the WALRootDir for the specified table name
* @param conf configuration used to get the WALRootDir
* @param tableName Table to get the directory for
* @return a path to the WAL table directory for the specified table
* @throws IOException if there is an exception determining the WALRootDir
*/
public static Path getWALTableDir(final Configuration conf, final TableName tableName)
throws IOException {
return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
}
/**
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
* the table directory under

View File

@ -143,7 +143,7 @@ public class WALSplitter {
// Parameters for split process
protected final Path walDir;
protected final FileSystem fs;
protected final FileSystem walFS;
protected final Configuration conf;
// Major subcomponents of the split process.
@ -190,14 +190,14 @@ public class WALSplitter {
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
FileSystem fs, LastSequenceId idChecker,
FileSystem walFS, LastSequenceId idChecker,
CoordinatedStateManager csm, RecoveryMode mode) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
this.fs = fs;
this.walFS = walFS;
this.sequenceIdChecker = idChecker;
this.csm = (BaseCoordinatedStateManager)csm;
this.walFactory = factory;
@ -238,7 +238,7 @@ public class WALSplitter {
* <p>
* @param walDir
* @param logfile
* @param fs
* @param walFS FileSystem to use for WAL reading and splitting
* @param conf
* @param reporter
* @param idChecker
@ -246,10 +246,10 @@ public class WALSplitter {
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs,
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker, cp, mode);
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, cp, mode);
return s.splitLogFile(logfile, reporter);
}
@ -317,7 +317,7 @@ public class WALSplitter {
in = getReader(logfile, skipErrors, reporter);
} catch (CorruptedLogFileException e) {
LOG.warn("Could not get reader, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs);
ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
isCorrupted = true;
}
if (in == null) {
@ -409,7 +409,7 @@ public class WALSplitter {
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted log file " + logPath, e);
csm.getSplitLogWorkerCoordination().markCorrupted(walDir,
logfile.getPath().getName(), fs);
logfile.getPath().getName(), walFS);
isCorrupted = true;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
@ -457,31 +457,30 @@ public class WALSplitter {
*/
public static void finishSplitLogFile(String logfile,
Configuration conf) throws IOException {
Path rootdir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
Path walDir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logPath;
if (FSUtils.isStartingWithPath(rootdir, logfile)) {
if (FSUtils.isStartingWithPath(walDir, logfile)) {
logPath = new Path(logfile);
} else {
logPath = new Path(rootdir, logfile);
logPath = new Path(walDir, logfile);
}
finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
finishSplitLogFile(walDir, oldLogDir, logPath, conf);
}
private static void finishSplitLogFile(Path rootdir, Path oldLogDir,
private static void finishSplitLogFile(Path walDir, Path oldLogDir,
Path logPath, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
FileSystem fs;
fs = rootdir.getFileSystem(conf);
if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
FileSystem walFS = walDir.getFileSystem(conf);
if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
corruptedLogs.add(logPath);
} else {
processedLogs.add(logPath);
}
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
fs.delete(stagingDir, true);
archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
walFS.delete(stagingDir, true);
}
/**
@ -492,28 +491,28 @@ public class WALSplitter {
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
* @param fs
* @param walFS FileSystem to use for WAL archival
* @param conf
* @throws IOException
*/
private static void archiveLogs(
final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
final FileSystem walFS, final Configuration conf) throws IOException {
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
if (!fs.mkdirs(corruptDir)) {
if (!walFS.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir " + corruptDir);
}
fs.mkdirs(oldLogDir);
walFS.mkdirs(oldLogDir);
// this method can get restarted or called multiple times for archiving
// the same log files.
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
if (fs.exists(corrupted)) {
if (!fs.rename(corrupted, p)) {
if (walFS.exists(corrupted)) {
if (!walFS.rename(corrupted, p)) {
LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
} else {
LOG.warn("Moved corrupted log " + corrupted + " to " + p);
@ -523,8 +522,8 @@ public class WALSplitter {
for (Path p : processedLogs) {
Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
if (fs.exists(p)) {
if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
if (walFS.exists(p)) {
if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
LOG.warn("Unable to move " + p + " to " + newPath);
} else {
LOG.info("Archived processed log " + p + " to " + newPath);
@ -550,35 +549,28 @@ public class WALSplitter {
@VisibleForTesting
static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
String tmpDirName, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path rootDir = FSUtils.getRootDir(conf);
Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
FileSystem walFS = FSUtils.getWALFileSystem(conf);
Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTablename());
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
Path dir = getRegionDirRecoveredEditsDir(regiondir);
Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
Path dir = getRegionDirRecoveredEditsDir(regionDir);
if (!fs.exists(regiondir)) {
LOG.info("This region's directory doesn't exist: "
+ regiondir.toString() + ". It is very likely that it was" +
" already split so it's safe to discard those edits.");
return null;
}
if (fs.exists(dir) && fs.isFile(dir)) {
if (walFS.exists(dir) && walFS.isFile(dir)) {
Path tmp = new Path(tmpDirName);
if (!fs.exists(tmp)) {
fs.mkdirs(tmp);
if (!walFS.exists(tmp)) {
walFS.mkdirs(tmp);
}
tmp = new Path(tmp,
HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
LOG.warn("Found existing old file: " + dir + ". It could be some "
+ "leftover of an old installation. It should be a folder instead. "
+ "So moving it to " + tmp);
if (!fs.rename(dir, tmp)) {
if (!walFS.rename(dir, tmp)) {
LOG.warn("Failed to sideline old file " + dir);
}
}
if (!fs.exists(dir) && !fs.mkdirs(dir)) {
if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
LOG.warn("mkdir failed on " + dir);
}
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
@ -616,31 +608,32 @@ public class WALSplitter {
private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
/**
* @param regiondir
* @param regionDir
* This regions directory in the filesystem.
* @return The directory that holds recovered edits files for the region
* <code>regiondir</code>
* <code>regionDir</code>
*/
public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
}
/**
* Returns sorted set of edit files made by splitter, excluding files
* with '.temp' suffix.
*
* @param fs
* @param regiondir
* @return Files in passed <code>regiondir</code> as a sorted set.
* @param walFS FileSystem to use for reading Recovered edits files
* @param regionDir Directory where Recovered edits should reside
* @return Files in passed <code>regionDir</code> as a sorted set.
* @throws IOException
*/
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
final Path regiondir) throws IOException {
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
final Path regionDir) throws IOException {
NavigableSet<Path> filesSorted = new TreeSet<Path>();
Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(editsdir))
Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
if (!walFS.exists(editsdir)) {
return filesSorted;
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
}
FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
boolean result = false;
@ -650,7 +643,7 @@ public class WALSplitter {
// In particular, on error, we'll move aside the bad edit file giving
// it a timestamp suffix. See moveAsideBadEditsFile.
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
result = fs.isFile(p) && m.matches();
result = walFS.isFile(p) && m.matches();
// Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
// because it means splitwal thread is writting this file.
if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
@ -678,17 +671,17 @@ public class WALSplitter {
/**
* Move aside a bad edits file.
*
* @param fs
* @param walFS FileSystem to use for WAL operations
* @param edits
* Edits file to move aside.
* @return The name of the moved aside file.
* @throws IOException
*/
public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
throws IOException {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
+ System.currentTimeMillis());
if (!fs.rename(edits, moveAsideName)) {
if (!walFS.rename(edits, moveAsideName)) {
LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
}
return moveAsideName;
@ -709,21 +702,21 @@ public class WALSplitter {
/**
* Create a file with name as region open sequence id
* @param fs
* @param regiondir
* @param walFS FileSystem to write Sequence file to
* @param regionDir WALRegionDir used to determine where to write edits files
* @param newSeqId
* @param saftyBumper
* @return long new sequence Id value
* @throws IOException
*/
public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
public static long writeRegionSequenceIdFile(final FileSystem walFS, final Path regionDir,
long newSeqId, long saftyBumper) throws IOException {
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
long maxSeqId = 0;
FileStatus[] files = null;
if (fs.exists(editsdir)) {
files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
if (walFS.exists(editsdir)) {
files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
return isSequenceIdFile(p);
@ -751,7 +744,7 @@ public class WALSplitter {
Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
if (newSeqId != maxSeqId) {
try {
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
}
if (LOG.isDebugEnabled()) {
@ -768,7 +761,7 @@ public class WALSplitter {
if (newSeqIdFile.equals(status.getPath())) {
continue;
}
fs.delete(status.getPath(), false);
walFS.delete(status.getPath(), false);
}
}
return newSeqId;
@ -796,7 +789,7 @@ public class WALSplitter {
}
try {
FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
try {
in = getReader(path, reporter);
} catch (EOFException e) {
@ -866,7 +859,7 @@ public class WALSplitter {
*/
protected Writer createWriter(Path logfile)
throws IOException {
return walFactory.createRecoveredEditsWriter(fs, logfile);
return walFactory.createRecoveredEditsWriter(walFS, logfile);
}
/**
@ -874,7 +867,7 @@ public class WALSplitter {
* @return new Reader instance, caller should close
*/
protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
return walFactory.createReader(fs, curLogFile, reporter);
return walFactory.createReader(walFS, curLogFile, reporter);
}
/**
@ -1357,10 +1350,10 @@ public class WALSplitter {
}
// delete the one with fewer wal entries
void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst)
void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
throws IOException {
long dstMinLogSeqNum = -1L;
try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getLogSeqNum();
@ -1375,15 +1368,15 @@ public class WALSplitter {
if (wap.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
+ walFS.getFileStatus(dst).getLen());
if (!walFS.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
}
} else {
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
+ ", length=" + rootFs.getFileStatus(wap.p).getLen());
if (!rootFs.delete(wap.p, false)) {
+ ", length=" + walFS.getFileStatus(wap.p).getLen());
if (!walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting of " + wap.p);
throw new IOException("Failed deleting of " + wap.p);
}
@ -1467,7 +1460,7 @@ public class WALSplitter {
if (LOG.isTraceEnabled()) {
LOG.trace("Closing " + wap.p);
}
FileSystem rootFs = FileSystem.get(conf);
try {
wap.w.close();
} catch (IOException ioe) {
@ -1482,7 +1475,7 @@ public class WALSplitter {
}
if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file
if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) {
if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting empty " + wap.p);
throw new IOException("Failed deleting empty " + wap.p);
}
@ -1492,14 +1485,14 @@ public class WALSplitter {
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
regionMaximumEditLogSeqNum.get(encodedRegionName));
try {
if (!dst.equals(wap.p) && rootFs.exists(dst)) {
deleteOneWithFewerEntries(rootFs, wap, dst);
if (!dst.equals(wap.p) && walFS.exists(dst)) {
deleteOneWithFewerEntries(wap, dst);
}
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
// TestHLogSplit#testThreading is an example.
if (rootFs.exists(wap.p)) {
if (!rootFs.rename(wap.p, dst)) {
if (walFS.exists(wap.p)) {
if (!walFS.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.info("Rename " + wap.p + " to " + dst);
@ -1596,12 +1589,11 @@ public class WALSplitter {
if (regionedits == null) {
return null;
}
FileSystem rootFs = FileSystem.get(conf);
if (rootFs.exists(regionedits)) {
if (walFS.exists(regionedits)) {
LOG.warn("Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ rootFs.getFileStatus(regionedits).getLen());
if (!rootFs.delete(regionedits, false)) {
+ walFS.getFileStatus(regionedits).getLen());
if (!walFS.delete(regionedits, false)) {
LOG.warn("Failed delete of old " + regionedits);
}
}

View File

@ -698,7 +698,7 @@ public class TestHRegion {
for (Store store : region.getStores()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId);
Get get = new Get(row);
@ -752,7 +752,7 @@ public class TestHRegion {
for (Store store : region.getStores()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId);
Get get = new Get(row);
@ -796,7 +796,7 @@ public class TestHRegion {
for (Store store : region.getStores()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
assertEquals(minSeqId, seqId);
}
@ -852,7 +852,7 @@ public class TestHRegion {
for (Store store : region.getStores()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
// assert that the files are flushed

View File

@ -112,7 +112,7 @@ public class TestRecoveredEdits {
// There should be no store files.
assertTrue(storeFiles.isEmpty());
region.close();
Path regionDir = region.getRegionDir(hbaseRootDir, hri);
Path regionDir = region.getWALRegionDir();
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
// This is a little fragile getting this path to a file of 10M of edits.
Path recoveredEditsFile = new Path(

View File

@ -288,7 +288,7 @@ public class TestWALReplay {
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
Path basedir = FSUtils.getWALTableDir(conf, tableName);
deleteDir(basedir);
HTableDescriptor htd = createBasic3FamilyHTD(tableName);
@ -915,7 +915,7 @@ public class TestWALReplay {
WALSplitter.splitLogFile(hbaseWALRootDir, listStatus[0],
this.fs, this.conf, null, null, null, mode, wals);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
new Path(FSUtils.getWALTableDir(this.conf, tableName), new Path(hri.getEncodedName(),
"recovered.edits")), new PathFilter() {
@Override
public boolean accept(Path p) {
@ -949,7 +949,7 @@ public class TestWALReplay {
final int countPerFamily = 10;
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.hbaseWALRootDir, this.conf, htd);
Path regionDir = region1.getRegionFileSystem().getRegionDir();
Path regionDir = region1.getWALRegionDir();
HBaseTestingUtility.closeRegionAndWAL(region1);
WAL wal = createWAL(this.conf);

View File

@ -172,7 +172,7 @@ public class TestWALFactory {
Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME);
final int howmany = 3;
HRegionInfo[] infos = new HRegionInfo[3];
Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
Path tabledir = FSUtils.getWALTableDir(conf, tableName);
fs.mkdirs(tabledir);
for(int i = 0; i < howmany; i++) {
infos[i] = new HRegionInfo(tableName,

View File

@ -247,9 +247,9 @@ public class TestWALSplit {
}
LOG.debug(ls);
LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
LOG.info("Finished splitting out from under zombie.");
Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals("wrong number of split files for region", numWriters, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
@ -435,9 +435,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 0);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -451,9 +451,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 100);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -478,13 +478,13 @@ public class TestWALSplit {
writer.close();
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
// original log should have 10 test edits, 10 region markers, 1 compaction marker
assertEquals(21, countWAL(originalLog));
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName());
Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -499,10 +499,10 @@ public class TestWALSplit {
private int splitAndCount(final int expectedFiles, final int expectedEntries)
throws IOException {
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
int result = 0;
for (String region : REGIONS) {
Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
@ -633,7 +633,7 @@ public class TestWALSplit {
walDirContents.add(status.getPath().getName());
}
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
return walDirContents;
} finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
@ -674,9 +674,9 @@ public class TestWALSplit {
corruptWAL(c1, corruption, true);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
int actualCount = 0;
@ -710,7 +710,7 @@ public class TestWALSplit {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateWALs(-1);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
}
@ -726,7 +726,7 @@ public class TestWALSplit {
throws IOException {
generateWALs(-1);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus [] statuses = null;
try {
statuses = fs.listStatus(WALDIR);
@ -756,7 +756,7 @@ public class TestWALSplit {
try {
InstrumentedLogWriter.activateFailure = true;
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
} catch (IOException e) {
assertTrue(e.getMessage().
contains("This exception is instrumented and should only be thrown for testing"));
@ -777,7 +777,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, region);
fs.delete(regiondir, true);
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir));
}
@ -854,7 +854,7 @@ public class TestWALSplit {
useDifferentDFSClient();
try {
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(WALDIR));
} catch (IOException e) {
@ -1077,7 +1077,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, REGION);
LOG.info("Region directory is" + regiondir);
fs.delete(regiondir, true);
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir));
}
@ -1090,7 +1090,7 @@ public class TestWALSplit {
injectEmptyFile(".empty", true);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
assertFalse(fs.exists(tdir));
@ -1115,7 +1115,7 @@ public class TestWALSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
assertEquals(1, fs.listStatus(corruptDir).length);
@ -1145,14 +1145,14 @@ public class TestWALSplit {
@Override
protected Writer createWriter(Path logfile)
throws IOException {
Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
// After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix.
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (files != null && !files.isEmpty()) {
for (Path file : files) {
if (!this.fs.delete(file, false)) {
if (!this.walFS.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
@ -1231,9 +1231,9 @@ public class TestWALSplit {
private Path[] getLogForRegion(Path rootdir, TableName table, String region)
private Path[] getLogForRegion(TableName table, String region)
throws IOException {
Path tdir = FSUtils.getTableDir(rootdir, table);
Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(region.getBytes())));