From 504286d55cac2d4439baeefc2f2f93311bfcc5e9 Mon Sep 17 00:00:00 2001 From: Zach York Date: Wed, 27 Jun 2018 16:18:53 -0700 Subject: [PATCH] HBASE-20734 Colocate recovered edits directory with hbase.wal.dir Amending-Author: Reid Chan Signed-off-by: Reid Chan --- .../hadoop/hbase/util/CommonFSUtils.java | 28 +++ .../MergeTableRegionsProcedure.java | 8 +- .../master/assignment/RegionStateStore.java | 8 +- .../assignment/SplitTableRegionProcedure.java | 10 +- .../AbstractStateMachineTableProcedure.java | 7 +- .../procedure/DisableTableProcedure.java | 6 +- .../hadoop/hbase/regionserver/HRegion.java | 162 ++++++++++----- .../apache/hadoop/hbase/wal/WALSplitter.java | 185 +++++++++--------- .../hadoop/hbase/master/AbstractTestDLS.java | 6 +- .../hbase/regionserver/TestHRegion.java | 8 +- .../wal/AbstractTestWALReplay.java | 8 +- .../hbase/wal/TestReadWriteSeqIdFiles.java | 18 +- .../hadoop/hbase/wal/TestWALFactory.java | 2 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 50 ++--- 14 files changed, 299 insertions(+), 207 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 8924098bec2..899c6332838 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -420,6 +420,34 @@ public abstract class CommonFSUtils { return true; } + /** + * Returns the WAL region directory based on the given table name and region name + * @param conf configuration to determine WALRootDir + * @param tableName Table that the region is under + * @param encodedRegionName Region name used for creating the final region directory + * @return the region directory used to store WALs under the WALRootDir + * @throws IOException if there is an exception determining the WALRootDir + */ + public static Path getWALRegionDir(final Configuration conf, + final TableName tableName, final String encodedRegionName) + throws IOException { + return new Path(getWALTableDir(conf, tableName), + encodedRegionName); + } + + /** + * Returns the Table directory under the WALRootDir for the specified table name + * @param conf configuration used to get the WALRootDir + * @param tableName Table to get the directory for + * @return a path to the WAL table directory for the specified table + * @throws IOException if there is an exception determining the WALRootDir + */ + public static Path getWALTableDir(final Configuration conf, final TableName tableName) + throws IOException { + return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + /** * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under * path rootdir diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index 9296222c0dc..a9ecffd2c17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -816,14 +816,16 @@ public class MergeTableRegionsProcedure } private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { - FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); long maxSequenceId = -1L; for (RegionInfo region : regionsToMerge) { maxSequenceId = - Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region))); + Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId( + walFS, getWALRegionDir(env, region))); } if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion), + maxSequenceId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index aeef835dec7..17535e80aba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; import java.util.Collections; import java.util.List; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; @@ -34,13 +35,13 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.yetus.audience.InterfaceAudience; @@ -216,9 +217,10 @@ public class RegionStateStore { } private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException { - MasterFileSystem mfs = master.getMasterFileSystem(); + FileSystem walFS = master.getMasterWalManager().getFileSystem(); long maxSeqId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir( + master.getConfiguration(), region.getTable(), region.getEncodedName())); return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index 2baa056bee2..28828d3801d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -875,12 +875,14 @@ public class SplitTableRegionProcedure } private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { - FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion())); + WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion())); if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId); - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI), + maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI), + maxSequenceId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 4c77f6b969d..2035116ce17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; /** @@ -130,8 +131,10 @@ public abstract class AbstractStateMachineTableProcedure } } - protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException { - return env.getMasterServices().getMasterFileSystem().getRegionDir(region); + protected final Path getWALRegionDir(MasterProcedureEnv env, RegionInfo region) + throws IOException { + return FSUtils.getWALRegionDir(env.getMasterConfiguration(), + region.getTable(), region.getEncodedName()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index dd1034ec0af..e4a57d5a949 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -30,7 +31,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -113,13 +113,13 @@ public class DisableTableProcedure case DISABLE_TABLE_ADD_REPLICATION_BARRIER: if (env.getMasterServices().getTableDescriptors().get(tableName) .hasGlobalReplicationScope()) { - MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); try (BufferedMutator mutator = env.getMasterServices().getConnection() .getBufferedMutator(TableName.META_TABLE_NAME)) { for (RegionInfo region : env.getAssignmentManager().getRegionStates() .getRegionsOfTable(tableName)) { long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region)); long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM; mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum, EnvironmentEdgeManager.currentTime())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f713df0daec..97c60c3c561 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -330,6 +330,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final int rowLockWaitDuration; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + private Path regionDir; + private FileSystem walFS; + // The internal wait duration to acquire a lock before read/update // from the region. It is not per row. The purpose of this wait time // is to avoid waiting a long time while the region is busy, so that @@ -943,7 +946,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi stores.forEach(HStore::startReplayingFromWAL); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, - replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status)); // Make sure mvcc is up to max. this.mvcc.advanceTo(maxSeqId); } finally { @@ -986,14 +989,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long maxSeqIdFromFile = - WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir()); + WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDir()); long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1; // The openSeqNum will always be increase even for read only region, as we rely on it to // determine whether a region has been successfully reopend, so here we always need to update // the max sequence id file. if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) { LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName()); - WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId - 1); + WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1); } LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId); @@ -1140,11 +1143,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, mvcc); - // Store SeqId in HDFS when a region closes + // Store SeqId in WAL FileSystem when a region closes // checking region folder exists is due to many tests which delete the table folder while a // table is still online - if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { - WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), + if (getWalFileSystem().exists(getWALRegionDir())) { + WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), mvcc.getReadPoint()); } } @@ -1863,6 +1866,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.fs; } + /** @return the WAL {@link HRegionFileSystem} used by this region */ + HRegionFileSystem getRegionWALFileSystem() throws IOException { + return new HRegionFileSystem(conf, getWalFileSystem(), + FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo()); + } + + /** @return the WAL {@link FileSystem} being used by this region */ + FileSystem getWalFileSystem() throws IOException { + if (walFS == null) { + walFS = FSUtils.getWALFileSystem(conf); + } + return walFS; + } + + /** + * @return the Region directory under WALRootDirectory + * @throws IOException if there is an error getting WALRootDir + */ + @VisibleForTesting + public Path getWALRegionDir() throws IOException { + if (regionDir == null) { + regionDir = FSUtils.getWALRegionDir(conf, getRegionInfo().getTable(), + getRegionInfo().getEncodedName()); + } + return regionDir; + } + @Override public long getEarliestFlushTimeForAllStores() { return Collections.min(lastStoreFlushTimeMap.values()); @@ -4465,8 +4495,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * recovered edits log or minSeqId if nothing added from editlogs. * @throws IOException */ - protected long replayRecoveredEditsIfAny(final Path regiondir, - Map maxSeqIdInStores, + protected long replayRecoveredEditsIfAny(Map maxSeqIdInStores, final CancelableProgressable reporter, final MonitoredTask status) throws IOException { long minSeqIdForTheRegion = -1; @@ -4477,14 +4506,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } long seqid = minSeqIdForTheRegion; - FileSystem fs = this.fs.getFileSystem(); - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + (files == null ? 0 : files.size()) - + " recovered edits file(s) under " + regiondir); + FileSystem walFS = getWalFileSystem(); + FileSystem rootFS = getFilesystem(); + Path regionDir = getWALRegionDir(); + Path defaultRegionDir = getRegionDir(FSUtils.getRootDir(conf), getRegionInfo()); + + // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear + // under the root dir even if walDir is set. + NavigableSet filesUnderRootDir = null; + if (!regionDir.equals(defaultRegionDir)) { + filesUnderRootDir = + WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir); + seqid = Math.max(seqid, + replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter, + defaultRegionDir)); } - if (files == null || files.isEmpty()) return seqid; + NavigableSet files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir); + seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS, + files, reporter, regionDir)); + + if (seqid > minSeqIdForTheRegion) { + // Then we added some edits to memory. Flush and cleanup split edit files. + internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); + } + // Now delete the content of recovered edits. We're done w/ them. + if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { + // For debugging data loss issues! + // If this flag is set, make use of the hfile archiving by making recovered.edits a fake + // column family. Have to fake out file type too by casting our recovered.edits as storefiles + String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName(); + Set fakeStoreFiles = new HashSet<>(files.size()); + for (Path file: files) { + fakeStoreFiles.add( + new HStoreFile(walFS, file, this.conf, null, null, true)); + } + getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); + } else { + if (filesUnderRootDir != null) { + for (Path file : filesUnderRootDir) { + if (!rootFS.delete(file, false)) { + LOG.error("Failed delete of {} from under the root directory.", file); + } else { + LOG.debug("Deleted recovered.edits under root directory. file=" + file); + } + } + } + for (Path file: files) { + if (!walFS.delete(file, false)) { + LOG.error("Failed delete of " + file); + } else { + LOG.debug("Deleted recovered.edits file=" + file); + } + } + } + return seqid; + } + + private long replayRecoveredEditsForPaths(long minSeqIdForTheRegion, FileSystem fs, + final NavigableSet files, final CancelableProgressable reporter, final Path regionDir) + throws IOException { + long seqid = minSeqIdForTheRegion; + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + (files == null ? 0 : files.size()) + + " recovered edits file(s) under " + regionDir); + } + + if (files == null || files.isEmpty()) { + return minSeqIdForTheRegion; + } for (Path edits: files) { if (edits == null || !fs.exists(edits)) { @@ -4499,8 +4589,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (maxSeqId <= minSeqIdForTheRegion) { if (LOG.isDebugEnabled()) { String msg = "Maximum sequenceid for this wal is " + maxSeqId - + " and minimum sequenceid for the region is " + minSeqIdForTheRegion - + ", skipped the whole file, path=" + edits; + + " and minimum sequenceid for the region is " + minSeqIdForTheRegion + + ", skipped the whole file, path=" + edits; LOG.debug(msg); } continue; @@ -4509,7 +4599,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { // replay the edits. Replay can return -1 if everything is skipped, only update // if seqId is greater - seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); + seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter, fs)); } catch (IOException e) { boolean skipErrors = conf.getBoolean( HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, @@ -4519,10 +4609,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (conf.get("hbase.skip.errors") != null) { LOG.warn( "The property 'hbase.skip.errors' has been deprecated. Please use " + - HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); + HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); } if (skipErrors) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + "=true so continuing. Renamed " + edits + " as " + p, e); @@ -4531,31 +4621,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - if (seqid > minSeqIdForTheRegion) { - // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); - } - // Now delete the content of recovered edits. We're done w/ them. - if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { - // For debugging data loss issues! - // If this flag is set, make use of the hfile archiving by making recovered.edits a fake - // column family. Have to fake out file type too by casting our recovered.edits as storefiles - String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); - Set fakeStoreFiles = new HashSet<>(files.size()); - for (Path file: files) { - fakeStoreFiles.add( - new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true)); - } - getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); - } else { - for (Path file: files) { - if (!fs.delete(file, false)) { - LOG.error("Failed delete of " + file); - } else { - LOG.debug("Deleted recovered.edits file=" + file); - } - } - } return seqid; } @@ -4569,12 +4634,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private long replayRecoveredEdits(final Path edits, - Map maxSeqIdInStores, final CancelableProgressable reporter) + Map maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs) throws IOException { String msg = "Replaying edits from " + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); - FileSystem fs = this.fs.getFileSystem(); status.setStatus("Opening recovered edits"); WAL.Reader reader = null; @@ -4728,7 +4792,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi coprocessorHost.postReplayWALs(this.getRegionInfo(), edits); } } catch (EOFException eof) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); msg = "EnLongAddered EOF. Most likely due to Master failure during " + "wal splitting, so we have this data in another edit. " + "Continuing, but renaming " + edits + " as " + p; @@ -4738,7 +4802,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (ioe.getCause() instanceof ParseException) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); msg = "File corruption enLongAddered! " + "Continuing, but renaming " + edits + " as " + p; LOG.warn(msg, ioe); @@ -8022,7 +8086,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 53 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 3 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index abf9a55f6c0..4a18ce5fcf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -116,7 +116,7 @@ public class WALSplitter { // Parameters for split process protected final Path walDir; - protected final FileSystem fs; + protected final FileSystem walFS; protected final Configuration conf; // Major subcomponents of the split process. @@ -149,14 +149,14 @@ public class WALSplitter { @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, - FileSystem fs, LastSequenceId idChecker, + FileSystem walFS, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; - this.fs = fs; + this.walFS = walFS; this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; @@ -186,11 +186,11 @@ public class WALSplitter { *

* @return false if it is interrupted by the progress-able. */ - public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs, + public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker, + WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, splitLogWorkerCoordination); return s.splitLogFile(logfile, reporter); } @@ -201,13 +201,13 @@ public class WALSplitter { // which uses this method to do log splitting. @VisibleForTesting public static List split(Path rootDir, Path logDir, Path oldLogDir, - FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { + FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List splits = new ArrayList<>(); if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); + WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -216,7 +216,7 @@ public class WALSplitter { } } } - if (!fs.delete(logDir, true)) { + if (!walFS.delete(logDir, true)) { throw new IOException("Unable to delete src dir: " + logDir); } return splits; @@ -322,10 +322,10 @@ public class WALSplitter { LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), fs); + splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); } else { // for tests only - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs); + ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); } isCorrupted = true; } catch (IOException e) { @@ -373,31 +373,30 @@ public class WALSplitter { */ public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { - Path rootdir = FSUtils.getWALRootDir(conf); - Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walDir = FSUtils.getWALRootDir(conf); + Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); Path logPath; - if (FSUtils.isStartingWithPath(rootdir, logfile)) { + if (FSUtils.isStartingWithPath(walDir, logfile)) { logPath = new Path(logfile); } else { - logPath = new Path(rootdir, logfile); + logPath = new Path(walDir, logfile); } - finishSplitLogFile(rootdir, oldLogDir, logPath, conf); + finishSplitLogFile(walDir, oldLogDir, logPath, conf); } - private static void finishSplitLogFile(Path rootdir, Path oldLogDir, + private static void finishSplitLogFile(Path walDir, Path oldLogDir, Path logPath, Configuration conf) throws IOException { List processedLogs = new ArrayList<>(); List corruptedLogs = new ArrayList<>(); - FileSystem fs; - fs = rootdir.getFileSystem(conf); - if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { + FileSystem walFS = walDir.getFileSystem(conf); + if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) { corruptedLogs.add(logPath); } else { processedLogs.add(logPath); } - archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); - Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); - fs.delete(stagingDir, true); + archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName()); + walFS.delete(stagingDir, true); } /** @@ -408,30 +407,30 @@ public class WALSplitter { * @param corruptedLogs * @param processedLogs * @param oldLogDir - * @param fs + * @param walFS WAL FileSystem to archive files on. * @param conf * @throws IOException */ private static void archiveLogs( final List corruptedLogs, final List processedLogs, final Path oldLogDir, - final FileSystem fs, final Configuration conf) throws IOException { + final FileSystem walFS, final Configuration conf) throws IOException { final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", corruptDir); } - if (!fs.mkdirs(corruptDir)) { + if (!walFS.mkdirs(corruptDir)) { LOG.info("Unable to mkdir {}", corruptDir); } - fs.mkdirs(oldLogDir); + walFS.mkdirs(oldLogDir); // this method can get restarted or called multiple times for archiving // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (fs.exists(corrupted)) { - if (!fs.rename(corrupted, p)) { + if (walFS.exists(corrupted)) { + if (!walFS.rename(corrupted, p)) { LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); } else { LOG.warn("Moved corrupted log {} to {}", corrupted, p); @@ -441,8 +440,8 @@ public class WALSplitter { for (Path p : processedLogs) { Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); - if (fs.exists(p)) { - if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { + if (walFS.exists(p)) { + if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { LOG.warn("Unable to move {} to {}", p, newPath); } else { LOG.info("Archived processed log {} to {}", p, newPath); @@ -468,35 +467,29 @@ public class WALSplitter { @VisibleForTesting static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path rootDir = FSUtils.getRootDir(conf); - Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName()); + FileSystem walFS = FSUtils.getWALFileSystem(conf); + Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName()); String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); - Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName); - Path dir = getRegionDirRecoveredEditsDir(regiondir); + Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); + Path dir = getRegionDirRecoveredEditsDir(regionDir); - if (!fs.exists(regiondir)) { - LOG.info("This region's directory does not exist: {}." - + "It is very likely that it was already split so it is " - + "safe to discard those edits.", regiondir); - return null; - } - if (fs.exists(dir) && fs.isFile(dir)) { + + if (walFS.exists(dir) && walFS.isFile(dir)) { Path tmp = new Path(tmpDirName); - if (!fs.exists(tmp)) { - fs.mkdirs(tmp); + if (!walFS.exists(tmp)) { + walFS.mkdirs(tmp); } tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); LOG.warn("Found existing old file: {}. It could be some " + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}", dir, tmp); - if (!fs.rename(dir, tmp)) { + if (!walFS.rename(dir, tmp)) { LOG.warn("Failed to sideline old file {}", dir); } } - if (!fs.exists(dir) && !fs.mkdirs(dir)) { + if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { LOG.warn("mkdir failed on {}", dir); } // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. @@ -534,34 +527,34 @@ public class WALSplitter { private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; /** - * @param regiondir + * @param regionDir * This regions directory in the filesystem. * @return The directory that holds recovered edits files for the region - * regiondir + * regionDir */ - public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { - return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); + public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { + return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); } /** * Check whether there is recovered.edits in the region dir - * @param fs FileSystem + * @param walFS FileSystem * @param conf conf * @param regionInfo the region to check * @throws IOException IOException * @return true if recovered.edits exist in the region dir */ - public static boolean hasRecoveredEdits(final FileSystem fs, + public static boolean hasRecoveredEdits(final FileSystem walFS, final Configuration conf, final RegionInfo regionInfo) throws IOException { // No recovered.edits for non default replica regions if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { return false; } - Path rootDir = FSUtils.getRootDir(conf); //Only default replica region can reach here, so we can use regioninfo //directly without converting it to default replica's regioninfo. - Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); - NavigableSet files = getSplitEditFilesSorted(fs, regionDir); + Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(), + regionInfo.getEncodedName()); + NavigableSet files = getSplitEditFilesSorted(walFS, regionDir); return files != null && !files.isEmpty(); } @@ -570,19 +563,19 @@ public class WALSplitter { * Returns sorted set of edit files made by splitter, excluding files * with '.temp' suffix. * - * @param fs - * @param regiondir - * @return Files in passed regiondir as a sorted set. + * @param walFS WAL FileSystem used to retrieving split edits files. + * @param regionDir WAL region dir to look for recovered edits files under. + * @return Files in passed regionDir as a sorted set. * @throws IOException */ - public static NavigableSet getSplitEditFilesSorted(final FileSystem fs, - final Path regiondir) throws IOException { + public static NavigableSet getSplitEditFilesSorted(final FileSystem walFS, + final Path regionDir) throws IOException { NavigableSet filesSorted = new TreeSet<>(); - Path editsdir = getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(editsdir)) { + Path editsdir = getRegionDirRecoveredEditsDir(regionDir); + if (!walFS.exists(editsdir)) { return filesSorted; } - FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { boolean result = false; @@ -592,7 +585,7 @@ public class WALSplitter { // In particular, on error, we'll move aside the bad edit file giving // it a timestamp suffix. See moveAsideBadEditsFile. Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); - result = fs.isFile(p) && m.matches(); + result = walFS.isFile(p) && m.matches(); // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, // because it means splitwal thread is writting this file. if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { @@ -617,17 +610,17 @@ public class WALSplitter { /** * Move aside a bad edits file. * - * @param fs + * @param walFS WAL FileSystem used to rename bad edits file. * @param edits * Edits file to move aside. * @return The name of the moved aside file. * @throws IOException */ - public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) + public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) throws IOException { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); - if (!fs.rename(edits, moveAsideName)) { + if (!walFS.rename(edits, moveAsideName)) { LOG.warn("Rename failed from {} to {}", edits, moveAsideName); } return moveAsideName; @@ -646,12 +639,13 @@ public class WALSplitter { || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); } - private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException { + private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) + throws IOException { // TODO: Why are we using a method in here as part of our normal region open where // there is no splitting involved? Fix. St.Ack 01/20/2017. Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); try { - FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile); + FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile); return files != null ? files : new FileStatus[0]; } catch (FileNotFoundException e) { return new FileStatus[0]; @@ -675,16 +669,16 @@ public class WALSplitter { /** * Get the max sequence id which is stored in the region directory. -1 if none. */ - public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException { - return getMaxSequenceId(getSequenceIdFiles(fs, regionDir)); + public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { + return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); } /** * Create a file with name as region's max sequence id */ - public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId) + public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) throws IOException { - FileStatus[] files = getSequenceIdFiles(fs, regionDir); + FileStatus[] files = getSequenceIdFiles(walFS, regionDir); long maxSeqId = getMaxSequenceId(files); if (maxSeqId > newMaxSeqId) { throw new IOException("The new max sequence id " + newMaxSeqId + @@ -695,7 +689,7 @@ public class WALSplitter { newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); if (newMaxSeqId != maxSeqId) { try { - if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { + if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, @@ -707,7 +701,7 @@ public class WALSplitter { // remove old ones for (FileStatus status : files) { if (!newSeqIdFile.equals(status.getPath())) { - fs.delete(status.getPath(), false); + walFS.delete(status.getPath(), false); } } } @@ -734,7 +728,7 @@ public class WALSplitter { } try { - FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); + FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); try { in = getReader(path, reporter); } catch (EOFException e) { @@ -801,7 +795,7 @@ public class WALSplitter { */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(fs, logfile); + return walFactory.createRecoveredEditsWriter(walFS, logfile); } /** @@ -809,7 +803,7 @@ public class WALSplitter { * @return new Reader instance, caller should close */ protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(fs, curLogFile, reporter); + return walFactory.createReader(walFS, curLogFile, reporter); } /** @@ -1284,10 +1278,10 @@ public class WALSplitter { } // delete the one with fewer wal entries - private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst) + private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(fs, dst)) { + try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); @@ -1299,15 +1293,15 @@ public class WALSplitter { if (wap.minLogSeqNum < dstMinLogSeqNum) { LOG.warn("Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" - + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { + + walFS.getFileStatus(dst).getLen()); + if (!walFS.delete(dst, false)) { LOG.warn("Failed deleting of old {}", dst); throw new IOException("Failed deleting of old " + dst); } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p - + ", length=" + rootFs.getFileStatus(wap.p).getLen()); - if (!rootFs.delete(wap.p, false)) { + + ", length=" + walFS.getFileStatus(wap.p).getLen()); + if (!walFS.delete(wap.p, false)) { LOG.warn("Failed deleting of {}", wap.p); throw new IOException("Failed deleting of " + wap.p); } @@ -1391,10 +1385,7 @@ public class WALSplitter { Path closeWriter(String encodedRegionName, WriterAndPath wap, List thrown) throws IOException{ - if (LOG.isTraceEnabled()) { - LOG.trace("Closing " + wap.p); - } - FileSystem rootFs = FileSystem.get(conf); + LOG.trace("Closing " + wap.p); try { wap.w.close(); } catch (IOException ioe) { @@ -1409,7 +1400,7 @@ public class WALSplitter { } if (wap.editsWritten == 0) { // just remove the empty recovered.edits file - if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) { + if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { LOG.warn("Failed deleting empty " + wap.p); throw new IOException("Failed deleting empty " + wap.p); } @@ -1419,14 +1410,14 @@ public class WALSplitter { Path dst = getCompletedRecoveredEditsFilePath(wap.p, regionMaximumEditLogSeqNum.get(encodedRegionName)); try { - if (!dst.equals(wap.p) && rootFs.exists(dst)) { - deleteOneWithFewerEntries(rootFs, wap, dst); + if (!dst.equals(wap.p) && walFS.exists(dst)) { + deleteOneWithFewerEntries(wap, dst); } // Skip the unit tests which create a splitter that reads and // writes the data without touching disk. // TestHLogSplit#testThreading is an example. - if (rootFs.exists(wap.p)) { - if (!rootFs.rename(wap.p, dst)) { + if (walFS.exists(wap.p)) { + if (!walFS.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.info("Rename " + wap.p + " to " + dst); @@ -1520,12 +1511,12 @@ public class WALSplitter { if (regionedits == null) { return null; } - FileSystem rootFs = FileSystem.get(conf); - if (rootFs.exists(regionedits)) { + FileSystem walFs = FSUtils.getWALFileSystem(conf); + if (walFs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" - + rootFs.getFileStatus(regionedits).getLen()); - if (!rootFs.delete(regionedits, false)) { + + walFs.getFileStatus(regionedits).getLen()); + if (!walFs.delete(regionedits, false)) { LOG.warn("Failed delete of old {}", regionedits); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index bc4d32c3122..f36b38c3cf1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.assignment.RegionStates; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.Region; @@ -221,10 +220,11 @@ public abstract class AbstractTestDLS { int count = 0; for (RegionInfo hri : regions) { - Path tdir = FSUtils.getTableDir(rootdir, table); + Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter - .getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); + .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf, + tableName, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8f231ddceb0..3f9bf6afe43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -703,7 +703,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -755,7 +755,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -800,7 +800,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null); assertEquals(minSeqId, seqId); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); @@ -858,7 +858,7 @@ public class TestHRegion { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); // assert that the files are flushed diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 0a707ebbb4c..9b09f33aba9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -869,7 +869,7 @@ public abstract class AbstractTestWALReplay { final TableName tableName = TableName.valueOf(currentTest.getMethodName()); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final Path basedir = - FSUtils.getTableDir(this.hbaseRootDir, tableName); + FSUtils.getWALTableDir(conf, tableName); deleteDir(basedir); final byte[] rowName = tableName.getName(); final int countPerFamily = 10; @@ -902,7 +902,7 @@ public abstract class AbstractTestWALReplay { WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, wals); FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), + new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { @Override public boolean accept(Path p) { @@ -929,13 +929,13 @@ public abstract class AbstractTestWALReplay { public void testDatalossWhenInputError() throws Exception { final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); - final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); + final Path basedir = FSUtils.getWALTableDir(conf, tableName); deleteDir(basedir); final byte[] rowName = tableName.getName(); final int countPerFamily = 10; final HTableDescriptor htd = createBasic1FamilyHTD(tableName); HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); - Path regionDir = region1.getRegionFileSystem().getRegionDir(); + Path regionDir = region1.getWALRegionDir(); HBaseTestingUtility.closeRegionAndWAL(region1); WAL wal = createWAL(this.conf, hbaseRootDir, logName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java index 6e3aa105f70..8ae638ce9a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java @@ -49,13 +49,13 @@ public class TestReadWriteSeqIdFiles { private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); - private static FileSystem FS; + private static FileSystem walFS; private static Path REGION_DIR; @BeforeClass public static void setUp() throws IOException { - FS = FileSystem.getLocal(UTIL.getConfiguration()); + walFS = FileSystem.getLocal(UTIL.getConfiguration()); REGION_DIR = UTIL.getDataTestDir(); } @@ -66,20 +66,20 @@ public class TestReadWriteSeqIdFiles { @Test public void test() throws IOException { - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L); - assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L); - assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L); + assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L); + assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); // can not write a sequence id which is smaller try { - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L); } catch (IOException e) { // expected LOG.info("Expected error", e); } Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR); - FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() { + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { return WALSplitter.isSequenceIdFile(p); @@ -89,7 +89,7 @@ public class TestReadWriteSeqIdFiles { assertEquals(1, files.length); // verify all seqId files aren't treated as recovered.edits files - NavigableSet recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR); + NavigableSet recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR); assertEquals(0, recoveredEdits.size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index b262347104d..fe2626d0a15 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -181,7 +181,7 @@ public class TestWALFactory { final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); final int howmany = 3; RegionInfo[] infos = new RegionInfo[3]; - Path tabledir = FSUtils.getTableDir(hbaseDir, tableName); + Path tabledir = FSUtils.getWALTableDir(conf, tableName); fs.mkdirs(tabledir); for (int i = 0; i < howmany; i++) { infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 7a5abfd98c4..e6644f07dda 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -253,9 +253,9 @@ public class TestWALSplit { } LOG.debug(Objects.toString(ls)); LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); LOG.info("Finished splitting out from under zombie."); - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals("wrong number of split files for region", numWriters, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -440,9 +440,9 @@ public class TestWALSplit { generateWALs(1, 10, -1, 0); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -456,9 +456,9 @@ public class TestWALSplit { generateWALs(1, 10, -1, 100); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -483,13 +483,13 @@ public class TestWALSplit { writer.close(); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); // original log should have 10 test edits, 10 region markers, 1 compaction marker assertEquals(21, countWAL(originalLog)); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName()); + Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -504,10 +504,10 @@ public class TestWALSplit { private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); int result = 0; for (String region : REGIONS) { - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals(expectedFiles, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -640,7 +640,7 @@ public class TestWALSplit { walDirContents.add(status.getPath().getName()); } useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); return walDirContents; } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, @@ -681,9 +681,9 @@ public class TestWALSplit { corruptWAL(c1, corruption, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; @@ -717,7 +717,7 @@ public class TestWALSplit { conf.setBoolean(HBASE_SKIP_ERRORS, false); generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); } @@ -733,7 +733,7 @@ public class TestWALSplit { throws IOException { generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus [] statuses = null; try { statuses = fs.listStatus(WALDIR); @@ -763,7 +763,7 @@ public class TestWALSplit { try { InstrumentedLogWriter.activateFailure = true; - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); } catch (IOException e) { assertTrue(e.getMessage(). contains("This exception is instrumented and should only be thrown for testing")); @@ -784,7 +784,7 @@ public class TestWALSplit { Path regiondir = new Path(TABLEDIR, region); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -861,7 +861,7 @@ public class TestWALSplit { useDifferentDFSClient(); try { - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertFalse(fs.exists(WALDIR)); } catch (IOException e) { @@ -1085,7 +1085,7 @@ public class TestWALSplit { Path regiondir = new Path(TABLEDIR, REGION); LOG.info("Region directory is" + regiondir); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -1098,7 +1098,7 @@ public class TestWALSplit { injectEmptyFile(".empty", true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); assertFalse(fs.exists(tdir)); @@ -1123,7 +1123,7 @@ public class TestWALSplit { Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); assertEquals(1, fs.listStatus(corruptDir).length); @@ -1151,14 +1151,14 @@ public class TestWALSplit { @Override protected Writer createWriter(Path logfile) throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); + Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); if (files != null && !files.isEmpty()) { for (Path file : files) { - if (!this.fs.delete(file, false)) { + if (!this.walFS.delete(file, false)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); @@ -1237,9 +1237,9 @@ public class TestWALSplit { - private Path[] getLogForRegion(Path rootdir, TableName table, String region) + private Path[] getLogForRegion(TableName table, String region) throws IOException { - Path tdir = FSUtils.getTableDir(rootdir, table); + Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region))));