HBASE-20734 Colocate recovered edits directory with hbase.wal.dir

Amending-Author: Reid Chan <reidchan@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
Zach York 2018-06-27 16:18:53 -07:00 committed by Reid Chan
parent ba8a252167
commit 504286d55c
14 changed files with 299 additions and 207 deletions

View File

@ -420,6 +420,34 @@ public abstract class CommonFSUtils {
return true; return true;
} }
/**
* Returns the WAL region directory based on the given table name and region name
* @param conf configuration to determine WALRootDir
* @param tableName Table that the region is under
* @param encodedRegionName Region name used for creating the final region directory
* @return the region directory used to store WALs under the WALRootDir
* @throws IOException if there is an exception determining the WALRootDir
*/
public static Path getWALRegionDir(final Configuration conf,
final TableName tableName, final String encodedRegionName)
throws IOException {
return new Path(getWALTableDir(conf, tableName),
encodedRegionName);
}
/**
* Returns the Table directory under the WALRootDir for the specified table name
* @param conf configuration used to get the WALRootDir
* @param tableName Table to get the directory for
* @return a path to the WAL table directory for the specified table
* @throws IOException if there is an exception determining the WALRootDir
*/
public static Path getWALTableDir(final Configuration conf, final TableName tableName)
throws IOException {
return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
}
/** /**
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
* path rootdir * path rootdir

View File

@ -816,14 +816,16 @@ public class MergeTableRegionsProcedure
} }
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId = -1L; long maxSequenceId = -1L;
for (RegionInfo region : regionsToMerge) { for (RegionInfo region : regionsToMerge) {
maxSequenceId = maxSequenceId =
Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region))); Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
walFS, getWALRegionDir(env, region)));
} }
if (maxSequenceId > 0) { if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId); WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
maxSequenceId);
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellBuilderType;
@ -34,13 +35,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -216,9 +217,10 @@ public class RegionStateStore {
} }
private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException { private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
MasterFileSystem mfs = master.getMasterFileSystem(); FileSystem walFS = master.getMasterWalManager().getFileSystem();
long maxSeqId = long maxSeqId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
master.getConfiguration(), region.getTable(), region.getEncodedName()));
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM; return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
} }

View File

@ -875,12 +875,14 @@ public class SplitTableRegionProcedure
} }
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId = long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion())); WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
if (maxSequenceId > 0) { if (maxSequenceId > 0) {
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId); WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI),
WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId); maxSequenceId);
WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI),
maxSequenceId);
} }
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -130,8 +131,10 @@ public abstract class AbstractStateMachineTableProcedure<TState>
} }
} }
protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException { protected final Path getWALRegionDir(MasterProcedureEnv env, RegionInfo region)
return env.getMasterServices().getMasterFileSystem().getRegionDir(region); throws IOException {
return FSUtils.getWALRegionDir(env.getMasterConfiguration(),
region.getTable(), region.getEncodedName());
} }
/** /**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
@ -30,7 +31,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -113,13 +113,13 @@ public class DisableTableProcedure
case DISABLE_TABLE_ADD_REPLICATION_BARRIER: case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
if (env.getMasterServices().getTableDescriptors().get(tableName) if (env.getMasterServices().getTableDescriptors().get(tableName)
.hasGlobalReplicationScope()) { .hasGlobalReplicationScope()) {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
try (BufferedMutator mutator = env.getMasterServices().getConnection() try (BufferedMutator mutator = env.getMasterServices().getConnection()
.getBufferedMutator(TableName.META_TABLE_NAME)) { .getBufferedMutator(TableName.META_TABLE_NAME)) {
for (RegionInfo region : env.getAssignmentManager().getRegionStates() for (RegionInfo region : env.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tableName)) { .getRegionsOfTable(tableName)) {
long maxSequenceId = long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM; long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum, mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
EnvironmentEdgeManager.currentTime())); EnvironmentEdgeManager.currentTime()));

View File

@ -330,6 +330,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final int rowLockWaitDuration; private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
private Path regionDir;
private FileSystem walFS;
// The internal wait duration to acquire a lock before read/update // The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time // from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that // is to avoid waiting a long time while the region is busy, so that
@ -943,7 +946,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
stores.forEach(HStore::startReplayingFromWAL); stores.forEach(HStore::startReplayingFromWAL);
// Recover any edits if available. // Recover any edits if available.
maxSeqId = Math.max(maxSeqId, maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
// Make sure mvcc is up to max. // Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId); this.mvcc.advanceTo(maxSeqId);
} finally { } finally {
@ -986,14 +989,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Use maximum of log sequenceid or that which was found in stores // Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1). // (particularly if no recovered edits, seqid will be -1).
long maxSeqIdFromFile = long maxSeqIdFromFile =
WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir()); WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDir());
long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1; long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
// The openSeqNum will always be increase even for read only region, as we rely on it to // The openSeqNum will always be increase even for read only region, as we rely on it to
// determine whether a region has been successfully reopend, so here we always need to update // determine whether a region has been successfully reopend, so here we always need to update
// the max sequence id file. // the max sequence id file.
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) { if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName()); LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId - 1); WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
} }
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId); LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
@ -1140,11 +1143,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
mvcc); mvcc);
// Store SeqId in HDFS when a region closes // Store SeqId in WAL FileSystem when a region closes
// checking region folder exists is due to many tests which delete the table folder while a // checking region folder exists is due to many tests which delete the table folder while a
// table is still online // table is still online
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { if (getWalFileSystem().exists(getWALRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
mvcc.getReadPoint()); mvcc.getReadPoint());
} }
} }
@ -1863,6 +1866,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return this.fs; return this.fs;
} }
/** @return the WAL {@link HRegionFileSystem} used by this region */
HRegionFileSystem getRegionWALFileSystem() throws IOException {
return new HRegionFileSystem(conf, getWalFileSystem(),
FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
}
/** @return the WAL {@link FileSystem} being used by this region */
FileSystem getWalFileSystem() throws IOException {
if (walFS == null) {
walFS = FSUtils.getWALFileSystem(conf);
}
return walFS;
}
/**
* @return the Region directory under WALRootDirectory
* @throws IOException if there is an error getting WALRootDir
*/
@VisibleForTesting
public Path getWALRegionDir() throws IOException {
if (regionDir == null) {
regionDir = FSUtils.getWALRegionDir(conf, getRegionInfo().getTable(),
getRegionInfo().getEncodedName());
}
return regionDir;
}
@Override @Override
public long getEarliestFlushTimeForAllStores() { public long getEarliestFlushTimeForAllStores() {
return Collections.min(lastStoreFlushTimeMap.values()); return Collections.min(lastStoreFlushTimeMap.values());
@ -4465,8 +4495,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs. * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException * @throws IOException
*/ */
protected long replayRecoveredEditsIfAny(final Path regiondir, protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status) final CancelableProgressable reporter, final MonitoredTask status)
throws IOException { throws IOException {
long minSeqIdForTheRegion = -1; long minSeqIdForTheRegion = -1;
@ -4477,14 +4506,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
long seqid = minSeqIdForTheRegion; long seqid = minSeqIdForTheRegion;
FileSystem fs = this.fs.getFileSystem(); FileSystem walFS = getWalFileSystem();
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); FileSystem rootFS = getFilesystem();
if (LOG.isDebugEnabled()) { Path regionDir = getWALRegionDir();
LOG.debug("Found " + (files == null ? 0 : files.size()) Path defaultRegionDir = getRegionDir(FSUtils.getRootDir(conf), getRegionInfo());
+ " recovered edits file(s) under " + regiondir);
// This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
// under the root dir even if walDir is set.
NavigableSet<Path> filesUnderRootDir = null;
if (!regionDir.equals(defaultRegionDir)) {
filesUnderRootDir =
WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir);
seqid = Math.max(seqid,
replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter,
defaultRegionDir));
} }
if (files == null || files.isEmpty()) return seqid; NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
files, reporter, regionDir));
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
}
// Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
fakeStoreFiles.add(
new HStoreFile(walFS, file, this.conf, null, null, true));
}
getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
if (filesUnderRootDir != null) {
for (Path file : filesUnderRootDir) {
if (!rootFS.delete(file, false)) {
LOG.error("Failed delete of {} from under the root directory.", file);
} else {
LOG.debug("Deleted recovered.edits under root directory. file=" + file);
}
}
}
for (Path file: files) {
if (!walFS.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
}
}
}
return seqid;
}
private long replayRecoveredEditsForPaths(long minSeqIdForTheRegion, FileSystem fs,
final NavigableSet<Path> files, final CancelableProgressable reporter, final Path regionDir)
throws IOException {
long seqid = minSeqIdForTheRegion;
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regionDir);
}
if (files == null || files.isEmpty()) {
return minSeqIdForTheRegion;
}
for (Path edits: files) { for (Path edits: files) {
if (edits == null || !fs.exists(edits)) { if (edits == null || !fs.exists(edits)) {
@ -4499,8 +4589,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (maxSeqId <= minSeqIdForTheRegion) { if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String msg = "Maximum sequenceid for this wal is " + maxSeqId String msg = "Maximum sequenceid for this wal is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits; + ", skipped the whole file, path=" + edits;
LOG.debug(msg); LOG.debug(msg);
} }
continue; continue;
@ -4509,7 +4599,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try { try {
// replay the edits. Replay can return -1 if everything is skipped, only update // replay the edits. Replay can return -1 if everything is skipped, only update
// if seqId is greater // if seqId is greater
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter, fs));
} catch (IOException e) { } catch (IOException e) {
boolean skipErrors = conf.getBoolean( boolean skipErrors = conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
@ -4519,10 +4609,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (conf.get("hbase.skip.errors") != null) { if (conf.get("hbase.skip.errors") != null) {
LOG.warn( LOG.warn(
"The property 'hbase.skip.errors' has been deprecated. Please use " + "The property 'hbase.skip.errors' has been deprecated. Please use " +
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
} }
if (skipErrors) { if (skipErrors) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits + + "=true so continuing. Renamed " + edits +
" as " + p, e); " as " + p, e);
@ -4531,31 +4621,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
} }
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
}
// Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
fakeStoreFiles.add(
new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true));
}
getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
for (Path file: files) {
if (!fs.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
}
}
}
return seqid; return seqid;
} }
@ -4569,12 +4634,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException * @throws IOException
*/ */
private long replayRecoveredEdits(final Path edits, private long replayRecoveredEdits(final Path edits,
Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter) Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs)
throws IOException { throws IOException {
String msg = "Replaying edits from " + edits; String msg = "Replaying edits from " + edits;
LOG.info(msg); LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg);
FileSystem fs = this.fs.getFileSystem();
status.setStatus("Opening recovered edits"); status.setStatus("Opening recovered edits");
WAL.Reader reader = null; WAL.Reader reader = null;
@ -4728,7 +4792,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits); coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
} }
} catch (EOFException eof) { } catch (EOFException eof) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during " + msg = "EnLongAddered EOF. Most likely due to Master failure during " +
"wal splitting, so we have this data in another edit. " + "wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p; "Continuing, but renaming " + edits + " as " + p;
@ -4738,7 +4802,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If the IOE resulted from bad file format, // If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help // then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) { if (ioe.getCause() instanceof ParseException) {
Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "File corruption enLongAddered! " + msg = "File corruption enLongAddered! " +
"Continuing, but renaming " + edits + " as " + p; "Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe); LOG.warn(msg, ioe);
@ -8022,7 +8086,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + ClassSize.OBJECT +
ClassSize.ARRAY + ClassSize.ARRAY +
51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + 53 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) + (14 * Bytes.SIZEOF_LONG) +
3 * Bytes.SIZEOF_BOOLEAN); 3 * Bytes.SIZEOF_BOOLEAN);

View File

@ -116,7 +116,7 @@ public class WALSplitter {
// Parameters for split process // Parameters for split process
protected final Path walDir; protected final Path walDir;
protected final FileSystem fs; protected final FileSystem walFS;
protected final Configuration conf; protected final Configuration conf;
// Major subcomponents of the split process. // Major subcomponents of the split process.
@ -149,14 +149,14 @@ public class WALSplitter {
@VisibleForTesting @VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
FileSystem fs, LastSequenceId idChecker, FileSystem walFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination) { SplitLogWorkerCoordination splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir; this.walDir = walDir;
this.fs = fs; this.walFS = walFS;
this.sequenceIdChecker = idChecker; this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination; this.splitLogWorkerCoordination = splitLogWorkerCoordination;
@ -186,11 +186,11 @@ public class WALSplitter {
* <p> * <p>
* @return false if it is interrupted by the progress-able. * @return false if it is interrupted by the progress-able.
*/ */
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs, public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
throws IOException { throws IOException {
WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker, WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
splitLogWorkerCoordination); splitLogWorkerCoordination);
return s.splitLogFile(logfile, reporter); return s.splitLogFile(logfile, reporter);
} }
@ -201,13 +201,13 @@ public class WALSplitter {
// which uses this method to do log splitting. // which uses this method to do log splitting.
@VisibleForTesting @VisibleForTesting
public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf, final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
Collections.singletonList(logDir), null); Collections.singletonList(logDir), null);
List<Path> splits = new ArrayList<>(); List<Path> splits = new ArrayList<>();
if (ArrayUtils.isNotEmpty(logfiles)) { if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile: logfiles) { for (FileStatus logfile: logfiles) {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
if (s.splitLogFile(logfile, null)) { if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) { if (s.outputSink.splits != null) {
@ -216,7 +216,7 @@ public class WALSplitter {
} }
} }
} }
if (!fs.delete(logDir, true)) { if (!walFS.delete(logDir, true)) {
throw new IOException("Unable to delete src dir: " + logDir); throw new IOException("Unable to delete src dir: " + logDir);
} }
return splits; return splits;
@ -322,10 +322,10 @@ public class WALSplitter {
LOG.warn("Could not parse, corrupted WAL={}", logPath, e); LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
if (splitLogWorkerCoordination != null) { if (splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null. // Some tests pass in a csm of null.
splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), fs); splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
} else { } else {
// for tests only // for tests only
ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs); ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
} }
isCorrupted = true; isCorrupted = true;
} catch (IOException e) { } catch (IOException e) {
@ -373,31 +373,30 @@ public class WALSplitter {
*/ */
public static void finishSplitLogFile(String logfile, public static void finishSplitLogFile(String logfile,
Configuration conf) throws IOException { Configuration conf) throws IOException {
Path rootdir = FSUtils.getWALRootDir(conf); Path walDir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logPath; Path logPath;
if (FSUtils.isStartingWithPath(rootdir, logfile)) { if (FSUtils.isStartingWithPath(walDir, logfile)) {
logPath = new Path(logfile); logPath = new Path(logfile);
} else { } else {
logPath = new Path(rootdir, logfile); logPath = new Path(walDir, logfile);
} }
finishSplitLogFile(rootdir, oldLogDir, logPath, conf); finishSplitLogFile(walDir, oldLogDir, logPath, conf);
} }
private static void finishSplitLogFile(Path rootdir, Path oldLogDir, private static void finishSplitLogFile(Path walDir, Path oldLogDir,
Path logPath, Configuration conf) throws IOException { Path logPath, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<>(); List<Path> processedLogs = new ArrayList<>();
List<Path> corruptedLogs = new ArrayList<>(); List<Path> corruptedLogs = new ArrayList<>();
FileSystem fs; FileSystem walFS = walDir.getFileSystem(conf);
fs = rootdir.getFileSystem(conf); if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
corruptedLogs.add(logPath); corruptedLogs.add(logPath);
} else { } else {
processedLogs.add(logPath); processedLogs.add(logPath);
} }
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
fs.delete(stagingDir, true); walFS.delete(stagingDir, true);
} }
/** /**
@ -408,30 +407,30 @@ public class WALSplitter {
* @param corruptedLogs * @param corruptedLogs
* @param processedLogs * @param processedLogs
* @param oldLogDir * @param oldLogDir
* @param fs * @param walFS WAL FileSystem to archive files on.
* @param conf * @param conf
* @throws IOException * @throws IOException
*/ */
private static void archiveLogs( private static void archiveLogs(
final List<Path> corruptedLogs, final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir, final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException { final FileSystem walFS, final Configuration conf) throws IOException {
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
corruptDir); corruptDir);
} }
if (!fs.mkdirs(corruptDir)) { if (!walFS.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir {}", corruptDir); LOG.info("Unable to mkdir {}", corruptDir);
} }
fs.mkdirs(oldLogDir); walFS.mkdirs(oldLogDir);
// this method can get restarted or called multiple times for archiving // this method can get restarted or called multiple times for archiving
// the same log files. // the same log files.
for (Path corrupted : corruptedLogs) { for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName()); Path p = new Path(corruptDir, corrupted.getName());
if (fs.exists(corrupted)) { if (walFS.exists(corrupted)) {
if (!fs.rename(corrupted, p)) { if (!walFS.rename(corrupted, p)) {
LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
} else { } else {
LOG.warn("Moved corrupted log {} to {}", corrupted, p); LOG.warn("Moved corrupted log {} to {}", corrupted, p);
@ -441,8 +440,8 @@ public class WALSplitter {
for (Path p : processedLogs) { for (Path p : processedLogs) {
Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
if (fs.exists(p)) { if (walFS.exists(p)) {
if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
LOG.warn("Unable to move {} to {}", p, newPath); LOG.warn("Unable to move {} to {}", p, newPath);
} else { } else {
LOG.info("Archived processed log {} to {}", p, newPath); LOG.info("Archived processed log {} to {}", p, newPath);
@ -468,35 +467,29 @@ public class WALSplitter {
@VisibleForTesting @VisibleForTesting
static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
String tmpDirName, Configuration conf) throws IOException { String tmpDirName, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf); FileSystem walFS = FSUtils.getWALFileSystem(conf);
Path rootDir = FSUtils.getRootDir(conf); Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName());
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName); Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
Path dir = getRegionDirRecoveredEditsDir(regiondir); Path dir = getRegionDirRecoveredEditsDir(regionDir);
if (!fs.exists(regiondir)) {
LOG.info("This region's directory does not exist: {}." if (walFS.exists(dir) && walFS.isFile(dir)) {
+ "It is very likely that it was already split so it is "
+ "safe to discard those edits.", regiondir);
return null;
}
if (fs.exists(dir) && fs.isFile(dir)) {
Path tmp = new Path(tmpDirName); Path tmp = new Path(tmpDirName);
if (!fs.exists(tmp)) { if (!walFS.exists(tmp)) {
fs.mkdirs(tmp); walFS.mkdirs(tmp);
} }
tmp = new Path(tmp, tmp = new Path(tmp,
HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
LOG.warn("Found existing old file: {}. It could be some " LOG.warn("Found existing old file: {}. It could be some "
+ "leftover of an old installation. It should be a folder instead. " + "leftover of an old installation. It should be a folder instead. "
+ "So moving it to {}", dir, tmp); + "So moving it to {}", dir, tmp);
if (!fs.rename(dir, tmp)) { if (!walFS.rename(dir, tmp)) {
LOG.warn("Failed to sideline old file {}", dir); LOG.warn("Failed to sideline old file {}", dir);
} }
} }
if (!fs.exists(dir) && !fs.mkdirs(dir)) { if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
LOG.warn("mkdir failed on {}", dir); LOG.warn("mkdir failed on {}", dir);
} }
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
@ -534,34 +527,34 @@ public class WALSplitter {
private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
/** /**
* @param regiondir * @param regionDir
* This regions directory in the filesystem. * This regions directory in the filesystem.
* @return The directory that holds recovered edits files for the region * @return The directory that holds recovered edits files for the region
* <code>regiondir</code> * <code>regionDir</code>
*/ */
public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
} }
/** /**
* Check whether there is recovered.edits in the region dir * Check whether there is recovered.edits in the region dir
* @param fs FileSystem * @param walFS FileSystem
* @param conf conf * @param conf conf
* @param regionInfo the region to check * @param regionInfo the region to check
* @throws IOException IOException * @throws IOException IOException
* @return true if recovered.edits exist in the region dir * @return true if recovered.edits exist in the region dir
*/ */
public static boolean hasRecoveredEdits(final FileSystem fs, public static boolean hasRecoveredEdits(final FileSystem walFS,
final Configuration conf, final RegionInfo regionInfo) throws IOException { final Configuration conf, final RegionInfo regionInfo) throws IOException {
// No recovered.edits for non default replica regions // No recovered.edits for non default replica regions
if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
return false; return false;
} }
Path rootDir = FSUtils.getRootDir(conf);
//Only default replica region can reach here, so we can use regioninfo //Only default replica region can reach here, so we can use regioninfo
//directly without converting it to default replica's regioninfo. //directly without converting it to default replica's regioninfo.
Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(),
NavigableSet<Path> files = getSplitEditFilesSorted(fs, regionDir); regionInfo.getEncodedName());
NavigableSet<Path> files = getSplitEditFilesSorted(walFS, regionDir);
return files != null && !files.isEmpty(); return files != null && !files.isEmpty();
} }
@ -570,19 +563,19 @@ public class WALSplitter {
* Returns sorted set of edit files made by splitter, excluding files * Returns sorted set of edit files made by splitter, excluding files
* with '.temp' suffix. * with '.temp' suffix.
* *
* @param fs * @param walFS WAL FileSystem used to retrieving split edits files.
* @param regiondir * @param regionDir WAL region dir to look for recovered edits files under.
* @return Files in passed <code>regiondir</code> as a sorted set. * @return Files in passed <code>regionDir</code> as a sorted set.
* @throws IOException * @throws IOException
*/ */
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs, public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
final Path regiondir) throws IOException { final Path regionDir) throws IOException {
NavigableSet<Path> filesSorted = new TreeSet<>(); NavigableSet<Path> filesSorted = new TreeSet<>();
Path editsdir = getRegionDirRecoveredEditsDir(regiondir); Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
if (!fs.exists(editsdir)) { if (!walFS.exists(editsdir)) {
return filesSorted; return filesSorted;
} }
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override @Override
public boolean accept(Path p) { public boolean accept(Path p) {
boolean result = false; boolean result = false;
@ -592,7 +585,7 @@ public class WALSplitter {
// In particular, on error, we'll move aside the bad edit file giving // In particular, on error, we'll move aside the bad edit file giving
// it a timestamp suffix. See moveAsideBadEditsFile. // it a timestamp suffix. See moveAsideBadEditsFile.
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
result = fs.isFile(p) && m.matches(); result = walFS.isFile(p) && m.matches();
// Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
// because it means splitwal thread is writting this file. // because it means splitwal thread is writting this file.
if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
@ -617,17 +610,17 @@ public class WALSplitter {
/** /**
* Move aside a bad edits file. * Move aside a bad edits file.
* *
* @param fs * @param walFS WAL FileSystem used to rename bad edits file.
* @param edits * @param edits
* Edits file to move aside. * Edits file to move aside.
* @return The name of the moved aside file. * @return The name of the moved aside file.
* @throws IOException * @throws IOException
*/ */
public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
throws IOException { throws IOException {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
+ System.currentTimeMillis()); + System.currentTimeMillis());
if (!fs.rename(edits, moveAsideName)) { if (!walFS.rename(edits, moveAsideName)) {
LOG.warn("Rename failed from {} to {}", edits, moveAsideName); LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
} }
return moveAsideName; return moveAsideName;
@ -646,12 +639,13 @@ public class WALSplitter {
|| file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
} }
private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException { private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
throws IOException {
// TODO: Why are we using a method in here as part of our normal region open where // TODO: Why are we using a method in here as part of our normal region open where
// there is no splitting involved? Fix. St.Ack 01/20/2017. // there is no splitting involved? Fix. St.Ack 01/20/2017.
Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
try { try {
FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile); FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
return files != null ? files : new FileStatus[0]; return files != null ? files : new FileStatus[0];
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
return new FileStatus[0]; return new FileStatus[0];
@ -675,16 +669,16 @@ public class WALSplitter {
/** /**
* Get the max sequence id which is stored in the region directory. -1 if none. * Get the max sequence id which is stored in the region directory. -1 if none.
*/ */
public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException { public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
return getMaxSequenceId(getSequenceIdFiles(fs, regionDir)); return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
} }
/** /**
* Create a file with name as region's max sequence id * Create a file with name as region's max sequence id
*/ */
public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId) public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
throws IOException { throws IOException {
FileStatus[] files = getSequenceIdFiles(fs, regionDir); FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
long maxSeqId = getMaxSequenceId(files); long maxSeqId = getMaxSequenceId(files);
if (maxSeqId > newMaxSeqId) { if (maxSeqId > newMaxSeqId) {
throw new IOException("The new max sequence id " + newMaxSeqId + throw new IOException("The new max sequence id " + newMaxSeqId +
@ -695,7 +689,7 @@ public class WALSplitter {
newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
if (newMaxSeqId != maxSeqId) { if (newMaxSeqId != maxSeqId) {
try { try {
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile); throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
} }
LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
@ -707,7 +701,7 @@ public class WALSplitter {
// remove old ones // remove old ones
for (FileStatus status : files) { for (FileStatus status : files) {
if (!newSeqIdFile.equals(status.getPath())) { if (!newSeqIdFile.equals(status.getPath())) {
fs.delete(status.getPath(), false); walFS.delete(status.getPath(), false);
} }
} }
} }
@ -734,7 +728,7 @@ public class WALSplitter {
} }
try { try {
FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
try { try {
in = getReader(path, reporter); in = getReader(path, reporter);
} catch (EOFException e) { } catch (EOFException e) {
@ -801,7 +795,7 @@ public class WALSplitter {
*/ */
protected Writer createWriter(Path logfile) protected Writer createWriter(Path logfile)
throws IOException { throws IOException {
return walFactory.createRecoveredEditsWriter(fs, logfile); return walFactory.createRecoveredEditsWriter(walFS, logfile);
} }
/** /**
@ -809,7 +803,7 @@ public class WALSplitter {
* @return new Reader instance, caller should close * @return new Reader instance, caller should close
*/ */
protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
return walFactory.createReader(fs, curLogFile, reporter); return walFactory.createReader(walFS, curLogFile, reporter);
} }
/** /**
@ -1284,10 +1278,10 @@ public class WALSplitter {
} }
// delete the one with fewer wal entries // delete the one with fewer wal entries
private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst) private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
throws IOException { throws IOException {
long dstMinLogSeqNum = -1L; long dstMinLogSeqNum = -1L;
try (WAL.Reader reader = walFactory.createReader(fs, dst)) { try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
WAL.Entry entry = reader.next(); WAL.Entry entry = reader.next();
if (entry != null) { if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId(); dstMinLogSeqNum = entry.getKey().getSequenceId();
@ -1299,15 +1293,15 @@ public class WALSplitter {
if (wap.minLogSeqNum < dstMinLogSeqNum) { if (wap.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed" LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ fs.getFileStatus(dst).getLen()); + walFS.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) { if (!walFS.delete(dst, false)) {
LOG.warn("Failed deleting of old {}", dst); LOG.warn("Failed deleting of old {}", dst);
throw new IOException("Failed deleting of old " + dst); throw new IOException("Failed deleting of old " + dst);
} }
} else { } else {
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
+ ", length=" + rootFs.getFileStatus(wap.p).getLen()); + ", length=" + walFS.getFileStatus(wap.p).getLen());
if (!rootFs.delete(wap.p, false)) { if (!walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting of {}", wap.p); LOG.warn("Failed deleting of {}", wap.p);
throw new IOException("Failed deleting of " + wap.p); throw new IOException("Failed deleting of " + wap.p);
} }
@ -1391,10 +1385,7 @@ public class WALSplitter {
Path closeWriter(String encodedRegionName, WriterAndPath wap, Path closeWriter(String encodedRegionName, WriterAndPath wap,
List<IOException> thrown) throws IOException{ List<IOException> thrown) throws IOException{
if (LOG.isTraceEnabled()) { LOG.trace("Closing " + wap.p);
LOG.trace("Closing " + wap.p);
}
FileSystem rootFs = FileSystem.get(conf);
try { try {
wap.w.close(); wap.w.close();
} catch (IOException ioe) { } catch (IOException ioe) {
@ -1409,7 +1400,7 @@ public class WALSplitter {
} }
if (wap.editsWritten == 0) { if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file // just remove the empty recovered.edits file
if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) { if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting empty " + wap.p); LOG.warn("Failed deleting empty " + wap.p);
throw new IOException("Failed deleting empty " + wap.p); throw new IOException("Failed deleting empty " + wap.p);
} }
@ -1419,14 +1410,14 @@ public class WALSplitter {
Path dst = getCompletedRecoveredEditsFilePath(wap.p, Path dst = getCompletedRecoveredEditsFilePath(wap.p,
regionMaximumEditLogSeqNum.get(encodedRegionName)); regionMaximumEditLogSeqNum.get(encodedRegionName));
try { try {
if (!dst.equals(wap.p) && rootFs.exists(dst)) { if (!dst.equals(wap.p) && walFS.exists(dst)) {
deleteOneWithFewerEntries(rootFs, wap, dst); deleteOneWithFewerEntries(wap, dst);
} }
// Skip the unit tests which create a splitter that reads and // Skip the unit tests which create a splitter that reads and
// writes the data without touching disk. // writes the data without touching disk.
// TestHLogSplit#testThreading is an example. // TestHLogSplit#testThreading is an example.
if (rootFs.exists(wap.p)) { if (walFS.exists(wap.p)) {
if (!rootFs.rename(wap.p, dst)) { if (!walFS.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst); throw new IOException("Failed renaming " + wap.p + " to " + dst);
} }
LOG.info("Rename " + wap.p + " to " + dst); LOG.info("Rename " + wap.p + " to " + dst);
@ -1520,12 +1511,12 @@ public class WALSplitter {
if (regionedits == null) { if (regionedits == null) {
return null; return null;
} }
FileSystem rootFs = FileSystem.get(conf); FileSystem walFs = FSUtils.getWALFileSystem(conf);
if (rootFs.exists(regionedits)) { if (walFs.exists(regionedits)) {
LOG.warn("Found old edits file. It could be the " LOG.warn("Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ rootFs.getFileStatus(regionedits).getLen()); + walFs.getFileStatus(regionedits).getLen());
if (!rootFs.delete(regionedits, false)) { if (!walFs.delete(regionedits, false)) {
LOG.warn("Failed delete of old {}", regionedits); LOG.warn("Failed delete of old {}", regionedits);
} }
} }

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
@ -221,10 +220,11 @@ public abstract class AbstractTestDLS {
int count = 0; int count = 0;
for (RegionInfo hri : regions) { for (RegionInfo hri : regions) {
Path tdir = FSUtils.getTableDir(rootdir, table); Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
Path editsdir = WALSplitter Path editsdir = WALSplitter
.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
tableName, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir); LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override @Override

View File

@ -703,7 +703,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) { for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
} }
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId); assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId); region.getMVCC().advanceTo(seqId);
Get get = new Get(row); Get get = new Get(row);
@ -755,7 +755,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) { for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
} }
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId); assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId); region.getMVCC().advanceTo(seqId);
Get get = new Get(row); Get get = new Get(row);
@ -800,7 +800,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) { for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId); maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
} }
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null); long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
assertEquals(minSeqId, seqId); assertEquals(minSeqId, seqId);
} finally { } finally {
HBaseTestingUtility.closeRegionAndWAL(this.region); HBaseTestingUtility.closeRegionAndWAL(this.region);
@ -858,7 +858,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) { for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
} }
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId); assertEquals(maxSeqId, seqId);
// assert that the files are flushed // assert that the files are flushed

View File

@ -869,7 +869,7 @@ public abstract class AbstractTestWALReplay {
final TableName tableName = TableName.valueOf(currentTest.getMethodName()); final TableName tableName = TableName.valueOf(currentTest.getMethodName());
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir = final Path basedir =
FSUtils.getTableDir(this.hbaseRootDir, tableName); FSUtils.getWALTableDir(conf, tableName);
deleteDir(basedir); deleteDir(basedir);
final byte[] rowName = tableName.getName(); final byte[] rowName = tableName.getName();
final int countPerFamily = 10; final int countPerFamily = 10;
@ -902,7 +902,7 @@ public abstract class AbstractTestWALReplay {
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, wals); this.fs, this.conf, null, null, null, wals);
FileStatus[] listStatus1 = this.fs.listStatus( FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(),
"recovered.edits")), new PathFilter() { "recovered.edits")), new PathFilter() {
@Override @Override
public boolean accept(Path p) { public boolean accept(Path p) {
@ -929,13 +929,13 @@ public abstract class AbstractTestWALReplay {
public void testDatalossWhenInputError() throws Exception { public void testDatalossWhenInputError() throws Exception {
final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); final Path basedir = FSUtils.getWALTableDir(conf, tableName);
deleteDir(basedir); deleteDir(basedir);
final byte[] rowName = tableName.getName(); final byte[] rowName = tableName.getName();
final int countPerFamily = 10; final int countPerFamily = 10;
final HTableDescriptor htd = createBasic1FamilyHTD(tableName); final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
Path regionDir = region1.getRegionFileSystem().getRegionDir(); Path regionDir = region1.getWALRegionDir();
HBaseTestingUtility.closeRegionAndWAL(region1); HBaseTestingUtility.closeRegionAndWAL(region1);
WAL wal = createWAL(this.conf, hbaseRootDir, logName); WAL wal = createWAL(this.conf, hbaseRootDir, logName);

View File

@ -49,13 +49,13 @@ public class TestReadWriteSeqIdFiles {
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static FileSystem FS; private static FileSystem walFS;
private static Path REGION_DIR; private static Path REGION_DIR;
@BeforeClass @BeforeClass
public static void setUp() throws IOException { public static void setUp() throws IOException {
FS = FileSystem.getLocal(UTIL.getConfiguration()); walFS = FileSystem.getLocal(UTIL.getConfiguration());
REGION_DIR = UTIL.getDataTestDir(); REGION_DIR = UTIL.getDataTestDir();
} }
@ -66,20 +66,20 @@ public class TestReadWriteSeqIdFiles {
@Test @Test
public void test() throws IOException { public void test() throws IOException {
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L); WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L); WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
// can not write a sequence id which is smaller // can not write a sequence id which is smaller
try { try {
WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L); WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
} catch (IOException e) { } catch (IOException e) {
// expected // expected
LOG.info("Expected error", e); LOG.info("Expected error", e);
} }
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR);
FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() { FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override @Override
public boolean accept(Path p) { public boolean accept(Path p) {
return WALSplitter.isSequenceIdFile(p); return WALSplitter.isSequenceIdFile(p);
@ -89,7 +89,7 @@ public class TestReadWriteSeqIdFiles {
assertEquals(1, files.length); assertEquals(1, files.length);
// verify all seqId files aren't treated as recovered.edits files // verify all seqId files aren't treated as recovered.edits files
NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR); NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR);
assertEquals(0, recoveredEdits.size()); assertEquals(0, recoveredEdits.size());
} }
} }

View File

@ -181,7 +181,7 @@ public class TestWALFactory {
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
final int howmany = 3; final int howmany = 3;
RegionInfo[] infos = new RegionInfo[3]; RegionInfo[] infos = new RegionInfo[3];
Path tabledir = FSUtils.getTableDir(hbaseDir, tableName); Path tabledir = FSUtils.getWALTableDir(conf, tableName);
fs.mkdirs(tabledir); fs.mkdirs(tabledir);
for (int i = 0; i < howmany; i++) { for (int i = 0; i < howmany; i++) {
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))

View File

@ -253,9 +253,9 @@ public class TestWALSplit {
} }
LOG.debug(Objects.toString(ls)); LOG.debug(Objects.toString(ls));
LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
LOG.info("Finished splitting out from under zombie."); LOG.info("Finished splitting out from under zombie.");
Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals("wrong number of split files for region", numWriters, logfiles.length); assertEquals("wrong number of split files for region", numWriters, logfiles.length);
int count = 0; int count = 0;
for (Path logfile: logfiles) { for (Path logfile: logfiles) {
@ -440,9 +440,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 0); generateWALs(1, 10, -1, 0);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length); assertEquals(1, splitLog.length);
assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -456,9 +456,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 100); generateWALs(1, 10, -1, 100);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length); assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -483,13 +483,13 @@ public class TestWALSplit {
writer.close(); writer.close();
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
// original log should have 10 test edits, 10 region markers, 1 compaction marker // original log should have 10 test edits, 10 region markers, 1 compaction marker
assertEquals(21, countWAL(originalLog)); assertEquals(21, countWAL(originalLog));
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName()); Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
assertEquals(1, splitLog.length); assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@ -504,10 +504,10 @@ public class TestWALSplit {
private int splitAndCount(final int expectedFiles, final int expectedEntries) private int splitAndCount(final int expectedFiles, final int expectedEntries)
throws IOException { throws IOException {
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
int result = 0; int result = 0;
for (String region : REGIONS) { for (String region : REGIONS) {
Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals(expectedFiles, logfiles.length); assertEquals(expectedFiles, logfiles.length);
int count = 0; int count = 0;
for (Path logfile: logfiles) { for (Path logfile: logfiles) {
@ -640,7 +640,7 @@ public class TestWALSplit {
walDirContents.add(status.getPath().getName()); walDirContents.add(status.getPath().getName());
} }
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
return walDirContents; return walDirContents;
} finally { } finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
@ -681,9 +681,9 @@ public class TestWALSplit {
corruptWAL(c1, corruption, true); corruptWAL(c1, corruption, true);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length); assertEquals(1, splitLog.length);
int actualCount = 0; int actualCount = 0;
@ -717,7 +717,7 @@ public class TestWALSplit {
conf.setBoolean(HBASE_SKIP_ERRORS, false); conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateWALs(-1); generateWALs(-1);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
} }
@ -733,7 +733,7 @@ public class TestWALSplit {
throws IOException { throws IOException {
generateWALs(-1); generateWALs(-1);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus [] statuses = null; FileStatus [] statuses = null;
try { try {
statuses = fs.listStatus(WALDIR); statuses = fs.listStatus(WALDIR);
@ -763,7 +763,7 @@ public class TestWALSplit {
try { try {
InstrumentedLogWriter.activateFailure = true; InstrumentedLogWriter.activateFailure = true;
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
} catch (IOException e) { } catch (IOException e) {
assertTrue(e.getMessage(). assertTrue(e.getMessage().
contains("This exception is instrumented and should only be thrown for testing")); contains("This exception is instrumented and should only be thrown for testing"));
@ -784,7 +784,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, region); Path regiondir = new Path(TABLEDIR, region);
fs.delete(regiondir, true); fs.delete(regiondir, true);
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir)); assertFalse(fs.exists(regiondir));
} }
@ -861,7 +861,7 @@ public class TestWALSplit {
useDifferentDFSClient(); useDifferentDFSClient();
try { try {
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(WALDIR)); assertFalse(fs.exists(WALDIR));
} catch (IOException e) { } catch (IOException e) {
@ -1085,7 +1085,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, REGION); Path regiondir = new Path(TABLEDIR, REGION);
LOG.info("Region directory is" + regiondir); LOG.info("Region directory is" + regiondir);
fs.delete(regiondir, true); fs.delete(regiondir, true);
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir)); assertFalse(fs.exists(regiondir));
} }
@ -1098,7 +1098,7 @@ public class TestWALSplit {
injectEmptyFile(".empty", true); injectEmptyFile(".empty", true);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
assertFalse(fs.exists(tdir)); assertFalse(fs.exists(tdir));
@ -1123,7 +1123,7 @@ public class TestWALSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
assertEquals(1, fs.listStatus(corruptDir).length); assertEquals(1, fs.listStatus(corruptDir).length);
@ -1151,14 +1151,14 @@ public class TestWALSplit {
@Override @Override
protected Writer createWriter(Path logfile) protected Writer createWriter(Path logfile)
throws IOException { throws IOException {
Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
// After creating writer, simulate region's // After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix. // region and delete them, excluding files with '.temp' suffix.
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (files != null && !files.isEmpty()) { if (files != null && !files.isEmpty()) {
for (Path file : files) { for (Path file : files) {
if (!this.fs.delete(file, false)) { if (!this.walFS.delete(file, false)) {
LOG.error("Failed delete of " + file); LOG.error("Failed delete of " + file);
} else { } else {
LOG.debug("Deleted recovered.edits file=" + file); LOG.debug("Deleted recovered.edits file=" + file);
@ -1237,9 +1237,9 @@ public class TestWALSplit {
private Path[] getLogForRegion(Path rootdir, TableName table, String region) private Path[] getLogForRegion(TableName table, String region)
throws IOException { throws IOException {
Path tdir = FSUtils.getTableDir(rootdir, table); Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(Bytes.toBytes(region)))); Bytes.toString(Bytes.toBytes(region))));