HBASE-20734 Colocate recovered edits directory with hbase.wal.dir

Amending-Author: Reid Chan <reidchan@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
Zach York 2018-06-27 16:18:53 -07:00 committed by Reid Chan
parent 0f514ab75a
commit 21fafbaf53
14 changed files with 299 additions and 207 deletions

View File

@ -420,6 +420,34 @@ public abstract class CommonFSUtils {
return true;
}
/**
* Returns the WAL region directory based on the given table name and region name
* @param conf configuration to determine WALRootDir
* @param tableName Table that the region is under
* @param encodedRegionName Region name used for creating the final region directory
* @return the region directory used to store WALs under the WALRootDir
* @throws IOException if there is an exception determining the WALRootDir
*/
public static Path getWALRegionDir(final Configuration conf,
final TableName tableName, final String encodedRegionName)
throws IOException {
return new Path(getWALTableDir(conf, tableName),
encodedRegionName);
}
/**
* Returns the Table directory under the WALRootDir for the specified table name
* @param conf configuration used to get the WALRootDir
* @param tableName Table to get the directory for
* @return a path to the WAL table directory for the specified table
* @throws IOException if there is an exception determining the WALRootDir
*/
public static Path getWALTableDir(final Configuration conf, final TableName tableName)
throws IOException {
return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
}
/**
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
* path rootdir

View File

@ -757,14 +757,16 @@ public class MergeTableRegionsProcedure
}
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId = -1L;
for (RegionInfo region : regionsToMerge) {
maxSequenceId =
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region)));
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
walFS, getWALRegionDir(env, region)));
}
if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
maxSequenceId);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
@ -34,13 +35,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
@ -216,9 +217,10 @@ public class RegionStateStore {
}
private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
MasterFileSystem mfs = master.getMasterFileSystem();
FileSystem walFS = master.getMasterWalManager().getFileSystem();
long maxSeqId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
master.getConfiguration(), region.getTable(), region.getEncodedName()));
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
}

View File

@ -828,12 +828,14 @@ public class SplitTableRegionProcedure
}
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion()));
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI),
maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI),
maxSequenceId);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -128,8 +129,10 @@ public abstract class AbstractStateMachineTableProcedure<TState>
}
}
protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException {
return env.getMasterServices().getMasterFileSystem().getRegionDir(region);
protected final Path getWALRegionDir(MasterProcedureEnv env, RegionInfo region)
throws IOException {
return FSUtils.getWALRegionDir(env.getMasterConfiguration(),
region.getTable(), region.getEncodedName());
}
/**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
@ -30,7 +31,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -111,13 +111,13 @@ public class DisableTableProcedure
case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
if (env.getMasterServices().getTableDescriptors().get(tableName)
.hasGlobalReplicationScope()) {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
try (BufferedMutator mutator = env.getMasterServices().getConnection()
.getBufferedMutator(TableName.META_TABLE_NAME)) {
for (RegionInfo region : env.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tableName)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
EnvironmentEdgeManager.currentTime()));

View File

@ -330,6 +330,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
private Path regionDir;
private FileSystem walFS;
// The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that
@ -943,7 +946,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
stores.forEach(HStore::startReplayingFromWAL);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
// Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId);
} finally {
@ -986,14 +989,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long maxSeqIdFromFile =
WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir());
WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDir());
long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
// The openSeqNum will always be increase even for read only region, as we rely on it to
// determine whether a region has been successfully reopend, so here we always need to update
// the max sequence id file.
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId - 1);
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
}
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
@ -1140,11 +1143,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
mvcc);
// Store SeqId in HDFS when a region closes
// Store SeqId in WAL FileSystem when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
if (getWalFileSystem().exists(getWALRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
mvcc.getReadPoint());
}
}
@ -1863,6 +1866,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return this.fs;
}
/** @return the WAL {@link HRegionFileSystem} used by this region */
HRegionFileSystem getRegionWALFileSystem() throws IOException {
return new HRegionFileSystem(conf, getWalFileSystem(),
FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
}
/** @return the WAL {@link FileSystem} being used by this region */
FileSystem getWalFileSystem() throws IOException {
if (walFS == null) {
walFS = FSUtils.getWALFileSystem(conf);
}
return walFS;
}
/**
* @return the Region directory under WALRootDirectory
* @throws IOException if there is an error getting WALRootDir
*/
@VisibleForTesting
public Path getWALRegionDir() throws IOException {
if (regionDir == null) {
regionDir = FSUtils.getWALRegionDir(conf, getRegionInfo().getTable(),
getRegionInfo().getEncodedName());
}
return regionDir;
}
@Override
public long getEarliestFlushTimeForAllStores() {
return Collections.min(lastStoreFlushTimeMap.values());
@ -4466,8 +4496,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
Map<byte[], Long> maxSeqIdInStores,
protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status)
throws IOException {
long minSeqIdForTheRegion = -1;
@ -4478,14 +4507,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
long seqid = minSeqIdForTheRegion;
FileSystem fs = this.fs.getFileSystem();
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
FileSystem walFS = getWalFileSystem();
FileSystem rootFS = getFilesystem();
Path regionDir = getWALRegionDir();
Path defaultRegionDir = getRegionDir(FSUtils.getRootDir(conf), getRegionInfo());
// This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
// under the root dir even if walDir is set.
NavigableSet<Path> filesUnderRootDir = null;
if (!regionDir.equals(defaultRegionDir)) {
filesUnderRootDir =
WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir);
seqid = Math.max(seqid,
replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter,
defaultRegionDir));
}
if (files == null || files.isEmpty()) return seqid;
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
files, reporter, regionDir));
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
}
// Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
fakeStoreFiles.add(
new HStoreFile(walFS, file, this.conf, null, null, true));
}
getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
if (filesUnderRootDir != null) {
for (Path file : filesUnderRootDir) {
if (!rootFS.delete(file, false)) {
LOG.error("Failed delete of {} from under the root directory.", file);
} else {
LOG.debug("Deleted recovered.edits under root directory. file=" + file);
}
}
}
for (Path file: files) {
if (!walFS.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
}
}
}
return seqid;
}
private long replayRecoveredEditsForPaths(long minSeqIdForTheRegion, FileSystem fs,
final NavigableSet<Path> files, final CancelableProgressable reporter, final Path regionDir)
throws IOException {
long seqid = minSeqIdForTheRegion;
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regionDir);
}
if (files == null || files.isEmpty()) {
return minSeqIdForTheRegion;
}
for (Path edits: files) {
if (edits == null || !fs.exists(edits)) {
@ -4510,7 +4600,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
// replay the edits. Replay can return -1 if everything is skipped, only update
// if seqId is greater
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter, fs));
} catch (IOException e) {
boolean skipErrors = conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
@ -4523,7 +4613,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
@ -4532,31 +4622,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
}
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
}
// Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
fakeStoreFiles.add(
new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true));
}
getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
for (Path file: files) {
if (!fs.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
}
}
}
return seqid;
}
@ -4570,12 +4635,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs)
throws IOException {
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
FileSystem fs = this.fs.getFileSystem();
status.setStatus("Opening recovered edits");
WAL.Reader reader = null;
@ -4729,7 +4793,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during " +
"wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
@ -4739,7 +4803,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "File corruption enLongAddered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
@ -8023,7 +8087,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
53 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
3 * Bytes.SIZEOF_BOOLEAN);

View File

@ -116,7 +116,7 @@ public class WALSplitter {
// Parameters for split process
protected final Path walDir;
protected final FileSystem fs;
protected final FileSystem walFS;
protected final Configuration conf;
// Major subcomponents of the split process.
@ -149,14 +149,14 @@ public class WALSplitter {
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
FileSystem fs, LastSequenceId idChecker,
FileSystem walFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
this.fs = fs;
this.walFS = walFS;
this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
@ -186,11 +186,11 @@ public class WALSplitter {
* <p>
* @return false if it is interrupted by the progress-able.
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs,
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
throws IOException {
WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker,
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
splitLogWorkerCoordination);
return s.splitLogFile(logfile, reporter);
}
@ -201,13 +201,13 @@ public class WALSplitter {
// which uses this method to do log splitting.
@VisibleForTesting
public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
Collections.singletonList(logDir), null);
List<Path> splits = new ArrayList<>();
if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile: logfiles) {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
@ -216,7 +216,7 @@ public class WALSplitter {
}
}
}
if (!fs.delete(logDir, true)) {
if (!walFS.delete(logDir, true)) {
throw new IOException("Unable to delete src dir: " + logDir);
}
return splits;
@ -322,10 +322,10 @@ public class WALSplitter {
LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
if (splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null.
splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), fs);
splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
} else {
// for tests only
ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs);
ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
}
isCorrupted = true;
} catch (IOException e) {
@ -373,31 +373,30 @@ public class WALSplitter {
*/
public static void finishSplitLogFile(String logfile,
Configuration conf) throws IOException {
Path rootdir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
Path walDir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logPath;
if (FSUtils.isStartingWithPath(rootdir, logfile)) {
if (FSUtils.isStartingWithPath(walDir, logfile)) {
logPath = new Path(logfile);
} else {
logPath = new Path(rootdir, logfile);
logPath = new Path(walDir, logfile);
}
finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
finishSplitLogFile(walDir, oldLogDir, logPath, conf);
}
private static void finishSplitLogFile(Path rootdir, Path oldLogDir,
private static void finishSplitLogFile(Path walDir, Path oldLogDir,
Path logPath, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<>();
List<Path> corruptedLogs = new ArrayList<>();
FileSystem fs;
fs = rootdir.getFileSystem(conf);
if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
FileSystem walFS = walDir.getFileSystem(conf);
if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
corruptedLogs.add(logPath);
} else {
processedLogs.add(logPath);
}
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
fs.delete(stagingDir, true);
archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
walFS.delete(stagingDir, true);
}
/**
@ -408,30 +407,30 @@ public class WALSplitter {
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
* @param fs
* @param walFS WAL FileSystem to archive files on.
* @param conf
* @throws IOException
*/
private static void archiveLogs(
final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
final FileSystem walFS, final Configuration conf) throws IOException {
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
corruptDir);
}
if (!fs.mkdirs(corruptDir)) {
if (!walFS.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir {}", corruptDir);
}
fs.mkdirs(oldLogDir);
walFS.mkdirs(oldLogDir);
// this method can get restarted or called multiple times for archiving
// the same log files.
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
if (fs.exists(corrupted)) {
if (!fs.rename(corrupted, p)) {
if (walFS.exists(corrupted)) {
if (!walFS.rename(corrupted, p)) {
LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
} else {
LOG.warn("Moved corrupted log {} to {}", corrupted, p);
@ -441,8 +440,8 @@ public class WALSplitter {
for (Path p : processedLogs) {
Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
if (fs.exists(p)) {
if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
if (walFS.exists(p)) {
if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
LOG.warn("Unable to move {} to {}", p, newPath);
} else {
LOG.info("Archived processed log {} to {}", p, newPath);
@ -468,35 +467,29 @@ public class WALSplitter {
@VisibleForTesting
static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
String tmpDirName, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path rootDir = FSUtils.getRootDir(conf);
Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName());
FileSystem walFS = FSUtils.getWALFileSystem(conf);
Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
Path dir = getRegionDirRecoveredEditsDir(regiondir);
Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
Path dir = getRegionDirRecoveredEditsDir(regionDir);
if (!fs.exists(regiondir)) {
LOG.info("This region's directory does not exist: {}."
+ "It is very likely that it was already split so it is "
+ "safe to discard those edits.", regiondir);
return null;
}
if (fs.exists(dir) && fs.isFile(dir)) {
if (walFS.exists(dir) && walFS.isFile(dir)) {
Path tmp = new Path(tmpDirName);
if (!fs.exists(tmp)) {
fs.mkdirs(tmp);
if (!walFS.exists(tmp)) {
walFS.mkdirs(tmp);
}
tmp = new Path(tmp,
HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
LOG.warn("Found existing old file: {}. It could be some "
+ "leftover of an old installation. It should be a folder instead. "
+ "So moving it to {}", dir, tmp);
if (!fs.rename(dir, tmp)) {
if (!walFS.rename(dir, tmp)) {
LOG.warn("Failed to sideline old file {}", dir);
}
}
if (!fs.exists(dir) && !fs.mkdirs(dir)) {
if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
LOG.warn("mkdir failed on {}", dir);
}
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
@ -534,34 +527,34 @@ public class WALSplitter {
private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
/**
* @param regiondir
* @param regionDir
* This regions directory in the filesystem.
* @return The directory that holds recovered edits files for the region
* <code>regiondir</code>
* <code>regionDir</code>
*/
public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
}
/**
* Check whether there is recovered.edits in the region dir
* @param fs FileSystem
* @param walFS FileSystem
* @param conf conf
* @param regionInfo the region to check
* @throws IOException IOException
* @return true if recovered.edits exist in the region dir
*/
public static boolean hasRecoveredEdits(final FileSystem fs,
public static boolean hasRecoveredEdits(final FileSystem walFS,
final Configuration conf, final RegionInfo regionInfo) throws IOException {
// No recovered.edits for non default replica regions
if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
return false;
}
Path rootDir = FSUtils.getRootDir(conf);
//Only default replica region can reach here, so we can use regioninfo
//directly without converting it to default replica's regioninfo.
Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
NavigableSet<Path> files = getSplitEditFilesSorted(fs, regionDir);
Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(),
regionInfo.getEncodedName());
NavigableSet<Path> files = getSplitEditFilesSorted(walFS, regionDir);
return files != null && !files.isEmpty();
}
@ -570,19 +563,19 @@ public class WALSplitter {
* Returns sorted set of edit files made by splitter, excluding files
* with '.temp' suffix.
*
* @param fs
* @param regiondir
* @return Files in passed <code>regiondir</code> as a sorted set.
* @param walFS WAL FileSystem used to retrieving split edits files.
* @param regionDir WAL region dir to look for recovered edits files under.
* @return Files in passed <code>regionDir</code> as a sorted set.
* @throws IOException
*/
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
final Path regiondir) throws IOException {
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
final Path regionDir) throws IOException {
NavigableSet<Path> filesSorted = new TreeSet<>();
Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(editsdir)) {
Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
if (!walFS.exists(editsdir)) {
return filesSorted;
}
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
boolean result = false;
@ -592,7 +585,7 @@ public class WALSplitter {
// In particular, on error, we'll move aside the bad edit file giving
// it a timestamp suffix. See moveAsideBadEditsFile.
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
result = fs.isFile(p) && m.matches();
result = walFS.isFile(p) && m.matches();
// Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
// because it means splitwal thread is writting this file.
if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
@ -617,17 +610,17 @@ public class WALSplitter {
/**
* Move aside a bad edits file.
*
* @param fs
* @param walFS WAL FileSystem used to rename bad edits file.
* @param edits
* Edits file to move aside.
* @return The name of the moved aside file.
* @throws IOException
*/
public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
throws IOException {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
+ System.currentTimeMillis());
if (!fs.rename(edits, moveAsideName)) {
if (!walFS.rename(edits, moveAsideName)) {
LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
}
return moveAsideName;
@ -646,12 +639,13 @@ public class WALSplitter {
|| file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
}
private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException {
private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
throws IOException {
// TODO: Why are we using a method in here as part of our normal region open where
// there is no splitting involved? Fix. St.Ack 01/20/2017.
Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
try {
FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile);
FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
return files != null ? files : new FileStatus[0];
} catch (FileNotFoundException e) {
return new FileStatus[0];
@ -675,16 +669,16 @@ public class WALSplitter {
/**
* Get the max sequence id which is stored in the region directory. -1 if none.
*/
public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException {
return getMaxSequenceId(getSequenceIdFiles(fs, regionDir));
public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
}
/**
* Create a file with name as region's max sequence id
*/
public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId)
public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
throws IOException {
FileStatus[] files = getSequenceIdFiles(fs, regionDir);
FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
long maxSeqId = getMaxSequenceId(files);
if (maxSeqId > newMaxSeqId) {
throw new IOException("The new max sequence id " + newMaxSeqId +
@ -695,7 +689,7 @@ public class WALSplitter {
newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
if (newMaxSeqId != maxSeqId) {
try {
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
}
LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
@ -707,7 +701,7 @@ public class WALSplitter {
// remove old ones
for (FileStatus status : files) {
if (!newSeqIdFile.equals(status.getPath())) {
fs.delete(status.getPath(), false);
walFS.delete(status.getPath(), false);
}
}
}
@ -734,7 +728,7 @@ public class WALSplitter {
}
try {
FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
try {
in = getReader(path, reporter);
} catch (EOFException e) {
@ -801,7 +795,7 @@ public class WALSplitter {
*/
protected Writer createWriter(Path logfile)
throws IOException {
return walFactory.createRecoveredEditsWriter(fs, logfile);
return walFactory.createRecoveredEditsWriter(walFS, logfile);
}
/**
@ -809,7 +803,7 @@ public class WALSplitter {
* @return new Reader instance, caller should close
*/
protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
return walFactory.createReader(fs, curLogFile, reporter);
return walFactory.createReader(walFS, curLogFile, reporter);
}
/**
@ -1284,10 +1278,10 @@ public class WALSplitter {
}
// delete the one with fewer wal entries
private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst)
private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
throws IOException {
long dstMinLogSeqNum = -1L;
try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId();
@ -1299,15 +1293,15 @@ public class WALSplitter {
if (wap.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
+ walFS.getFileStatus(dst).getLen());
if (!walFS.delete(dst, false)) {
LOG.warn("Failed deleting of old {}", dst);
throw new IOException("Failed deleting of old " + dst);
}
} else {
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
+ ", length=" + rootFs.getFileStatus(wap.p).getLen());
if (!rootFs.delete(wap.p, false)) {
+ ", length=" + walFS.getFileStatus(wap.p).getLen());
if (!walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting of {}", wap.p);
throw new IOException("Failed deleting of " + wap.p);
}
@ -1391,10 +1385,7 @@ public class WALSplitter {
Path closeWriter(String encodedRegionName, WriterAndPath wap,
List<IOException> thrown) throws IOException{
if (LOG.isTraceEnabled()) {
LOG.trace("Closing " + wap.p);
}
FileSystem rootFs = FileSystem.get(conf);
try {
wap.w.close();
} catch (IOException ioe) {
@ -1409,7 +1400,7 @@ public class WALSplitter {
}
if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file
if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) {
if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting empty " + wap.p);
throw new IOException("Failed deleting empty " + wap.p);
}
@ -1419,14 +1410,14 @@ public class WALSplitter {
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
regionMaximumEditLogSeqNum.get(encodedRegionName));
try {
if (!dst.equals(wap.p) && rootFs.exists(dst)) {
deleteOneWithFewerEntries(rootFs, wap, dst);
if (!dst.equals(wap.p) && walFS.exists(dst)) {
deleteOneWithFewerEntries(wap, dst);
}
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
// TestHLogSplit#testThreading is an example.
if (rootFs.exists(wap.p)) {
if (!rootFs.rename(wap.p, dst)) {
if (walFS.exists(wap.p)) {
if (!walFS.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.info("Rename " + wap.p + " to " + dst);
@ -1520,12 +1511,12 @@ public class WALSplitter {
if (regionedits == null) {
return null;
}
FileSystem rootFs = FileSystem.get(conf);
if (rootFs.exists(regionedits)) {
FileSystem walFs = FSUtils.getWALFileSystem(conf);
if (walFs.exists(regionedits)) {
LOG.warn("Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ rootFs.getFileStatus(regionedits).getLen());
if (!rootFs.delete(regionedits, false)) {
+ walFs.getFileStatus(regionedits).getLen());
if (!walFs.delete(regionedits, false)) {
LOG.warn("Failed delete of old {}", regionedits);
}
}

View File

@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
@ -224,10 +223,11 @@ public abstract class AbstractTestDLS {
int count = 0;
for (RegionInfo hri : regions) {
Path tdir = FSUtils.getTableDir(rootdir, table);
Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
Path editsdir = WALSplitter
.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
tableName, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override

View File

@ -706,7 +706,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId);
Get get = new Get(row);
@ -758,7 +758,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId);
Get get = new Get(row);
@ -802,7 +802,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
assertEquals(minSeqId, seqId);
}
@ -856,7 +856,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
// assert that the files are flushed

View File

@ -869,7 +869,7 @@ public abstract class AbstractTestWALReplay {
final TableName tableName = TableName.valueOf(currentTest.getMethodName());
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir =
FSUtils.getTableDir(this.hbaseRootDir, tableName);
FSUtils.getWALTableDir(conf, tableName);
deleteDir(basedir);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
@ -902,7 +902,7 @@ public abstract class AbstractTestWALReplay {
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, wals);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(),
"recovered.edits")), new PathFilter() {
@Override
public boolean accept(Path p) {
@ -929,13 +929,13 @@ public abstract class AbstractTestWALReplay {
public void testDatalossWhenInputError() throws Exception {
final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
final Path basedir = FSUtils.getWALTableDir(conf, tableName);
deleteDir(basedir);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
Path regionDir = region1.getRegionFileSystem().getRegionDir();
Path regionDir = region1.getWALRegionDir();
HBaseTestingUtility.closeRegionAndWAL(region1);
WAL wal = createWAL(this.conf, hbaseRootDir, logName);

View File

@ -49,13 +49,13 @@ public class TestReadWriteSeqIdFiles {
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static FileSystem FS;
private static FileSystem walFS;
private static Path REGION_DIR;
@BeforeClass
public static void setUp() throws IOException {
FS = FileSystem.getLocal(UTIL.getConfiguration());
walFS = FileSystem.getLocal(UTIL.getConfiguration());
REGION_DIR = UTIL.getDataTestDir();
}
@ -66,20 +66,20 @@ public class TestReadWriteSeqIdFiles {
@Test
public void test() throws IOException {
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L);
assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR));
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L);
assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR));
WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
// can not write a sequence id which is smaller
try {
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L);
WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
} catch (IOException e) {
// expected
LOG.info("Expected error", e);
}
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR);
FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() {
FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
return WALSplitter.isSequenceIdFile(p);
@ -89,7 +89,7 @@ public class TestReadWriteSeqIdFiles {
assertEquals(1, files.length);
// verify all seqId files aren't treated as recovered.edits files
NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR);
NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR);
assertEquals(0, recoveredEdits.size());
}
}

View File

@ -181,7 +181,7 @@ public class TestWALFactory {
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
final int howmany = 3;
RegionInfo[] infos = new RegionInfo[3];
Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
Path tabledir = FSUtils.getWALTableDir(conf, tableName);
fs.mkdirs(tabledir);
for (int i = 0; i < howmany; i++) {
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))

View File

@ -253,9 +253,9 @@ public class TestWALSplit {
}
LOG.debug(Objects.toString(ls));
LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
LOG.info("Finished splitting out from under zombie.");
Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals("wrong number of split files for region", numWriters, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
@ -440,9 +440,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 0);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -456,9 +456,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 100);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -483,13 +483,13 @@ public class TestWALSplit {
writer.close();
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
// original log should have 10 test edits, 10 region markers, 1 compaction marker
assertEquals(21, countWAL(originalLog));
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName());
Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -504,10 +504,10 @@ public class TestWALSplit {
private int splitAndCount(final int expectedFiles, final int expectedEntries)
throws IOException {
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
int result = 0;
for (String region : REGIONS) {
Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
@ -640,7 +640,7 @@ public class TestWALSplit {
walDirContents.add(status.getPath().getName());
}
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
return walDirContents;
} finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
@ -681,9 +681,9 @@ public class TestWALSplit {
corruptWAL(c1, corruption, true);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
int actualCount = 0;
@ -717,7 +717,7 @@ public class TestWALSplit {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateWALs(-1);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
}
@ -733,7 +733,7 @@ public class TestWALSplit {
throws IOException {
generateWALs(-1);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus [] statuses = null;
try {
statuses = fs.listStatus(WALDIR);
@ -763,7 +763,7 @@ public class TestWALSplit {
try {
InstrumentedLogWriter.activateFailure = true;
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
} catch (IOException e) {
assertTrue(e.getMessage().
contains("This exception is instrumented and should only be thrown for testing"));
@ -784,7 +784,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, region);
fs.delete(regiondir, true);
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir));
}
@ -861,7 +861,7 @@ public class TestWALSplit {
useDifferentDFSClient();
try {
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(WALDIR));
} catch (IOException e) {
@ -1085,7 +1085,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, REGION);
LOG.info("Region directory is" + regiondir);
fs.delete(regiondir, true);
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir));
}
@ -1098,7 +1098,7 @@ public class TestWALSplit {
injectEmptyFile(".empty", true);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
assertFalse(fs.exists(tdir));
@ -1123,7 +1123,7 @@ public class TestWALSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
assertEquals(1, fs.listStatus(corruptDir).length);
@ -1151,14 +1151,14 @@ public class TestWALSplit {
@Override
protected Writer createWriter(Path logfile)
throws IOException {
Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
// After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix.
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (files != null && !files.isEmpty()) {
for (Path file : files) {
if (!this.fs.delete(file, false)) {
if (!this.walFS.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
@ -1237,9 +1237,9 @@ public class TestWALSplit {
private Path[] getLogForRegion(Path rootdir, TableName table, String region)
private Path[] getLogForRegion(TableName table, String region)
throws IOException {
Path tdir = FSUtils.getTableDir(rootdir, table);
Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(Bytes.toBytes(region))));