diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 13804ed4f82..8daff9d4284 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -179,7 +179,7 @@ public class SplitLogWorker implements Runnable { server.getCoordinatedStateManager() == null ? null : server.getCoordinatedStateManager().getSplitLogWorkerCoordination(); if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, p, - sequenceIdChecker, splitLogWorkerCoordination, factory)) { + sequenceIdChecker, splitLogWorkerCoordination, factory, server)) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java index 406e90d8d19..accfe56bfad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.wal; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.HashMap; @@ -28,25 +29,20 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.regionserver.CellSet; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -61,10 +57,6 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; private final WALSplitter walSplitter; - private final Map tableDescCache; - private Connection connection; - private Admin admin; - private FileSystem rootFS; // Since the splitting process may create multiple output files, we need a map // to track the output count of each region. @@ -72,19 +64,13 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { // Need a counter to track the opening writers. private final AtomicInteger openingWritersNum = new AtomicInteger(0); + private final ConcurrentMap tableDescCache; + public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { super(controller, entryBuffers, numWriters); this.walSplitter = walSplitter; - tableDescCache = new HashMap<>(); - } - - @Override - void startWriterThreads() throws IOException { - connection = ConnectionFactory.createConnection(walSplitter.conf); - admin = connection.getAdmin(); - rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf); - super.startWriterThreads(); + this.tableDescCache = new ConcurrentHashMap<>(); } @Override @@ -137,8 +123,6 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { } finally { isSuccessful &= writeRemainingEntryBuffers(); } - IOUtils.closeQuietly(admin); - IOUtils.closeQuietly(connection); return isSuccessful ? splits : null; } @@ -199,10 +183,10 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { long seqId, String familyName, boolean isMetaTable) throws IOException { Path outputFile = WALSplitUtil .getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId, - walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, rootFS); + walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, walSplitter.rootFS); checkPathValid(outputFile); StoreFileWriter.Builder writerBuilder = - new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, rootFS) + new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS) .withFilePath(outputFile); HFileContextBuilder hFileContextBuilder = new HFileContextBuilder(); if (isMetaTable) { @@ -216,10 +200,11 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { private void configContextForNonMetaWriter(TableName tableName, String familyName, HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder) throws IOException { - if (!tableDescCache.containsKey(tableName)) { - tableDescCache.put(tableName, admin.getDescriptor(tableName)); + TableDescriptor tableDesc = + tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t)); + if (tableDesc == null) { + throw new IOException("Failed to get table descriptor for table " + tableName); } - TableDescriptor tableDesc = tableDescCache.get(tableName); ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName)); hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize()) .withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding()) @@ -228,11 +213,27 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { } private void checkPathValid(Path outputFile) throws IOException { - if (rootFS.exists(outputFile)) { + if (walSplitter.rootFS.exists(outputFile)) { LOG.warn("this file {} may be left after last failed split ", outputFile); - if (!rootFS.delete(outputFile, false)) { + if (!walSplitter.rootFS.delete(outputFile, false)) { LOG.warn("delete old generated HFile {} failed", outputFile); } } } + + private TableDescriptor getTableDescriptor(TableName tableName) { + if (walSplitter.rsServices != null) { + try { + return walSplitter.rsServices.getConnection().getAdmin().getDescriptor(tableName); + } catch (IOException e) { + LOG.warn("Failed to get table descriptor for table {}", tableName, e); + } + } + try { + return walSplitter.tableDescriptors.get(tableName); + } catch (IOException e) { + LOG.warn("Failed to get table descriptor for table {}", tableName, e); + return null; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 8a66c172114..a7284535319 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -40,16 +40,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.regionserver.LastSequenceId; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; @@ -82,6 +85,10 @@ public class WALSplitter { protected final Path walDir; protected final FileSystem walFS; protected final Configuration conf; + final Path rootDir; + final FileSystem rootFS; + final RegionServerServices rsServices; + final TableDescriptors tableDescriptors; // Major subcomponents of the split process. // These are separated into inner classes to make testing easier. @@ -114,21 +121,31 @@ public class WALSplitter { @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, - LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { + Path rootDir, FileSystem rootFS, LastSequenceId idChecker, + SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; this.walFS = walFS; + this.rootDir = rootDir; + this.rootFS = rootFS; this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; + this.rsServices = rsServices; + if (rsServices != null) { + this.tableDescriptors = rsServices.getTableDescriptors(); + } else { + this.tableDescriptors = new FSTableDescriptors(rootFS, rootDir, true, true); + } this.walFactory = factory; PipelineController controller = new PipelineController(); this.tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + // if we limit the number of writers opened for sinking recovered edits boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); @@ -175,10 +192,12 @@ public class WALSplitter { */ public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, - SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) - throws IOException { - WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, - splitLogWorkerCoordination); + SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory, + RegionServerServices rsServices) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem rootFS = rootDir.getFileSystem(conf); + WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, + splitLogWorkerCoordination, rsServices); return s.splitLogFile(logfile, reporter); } @@ -187,16 +206,19 @@ public class WALSplitter { // It is public only because TestWALObserver is in a different package, // which uses this method to do log splitting. @VisibleForTesting - public static List split(Path rootDir, Path logDir, Path oldLogDir, - FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { - final FileStatus[] logfiles = SplitLogManager.getFileList(conf, - Collections.singletonList(logDir), null); + public static List split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS, + Configuration conf, final WALFactory factory) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem rootFS = rootDir.getFileSystem(conf); + final FileStatus[] logfiles = + SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List splits = new ArrayList<>(); if (ArrayUtils.isNotEmpty(logfiles)) { - for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); + for (FileStatus logfile : logfiles) { + WALSplitter s = + new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null); if (s.splitLogFile(logfile, null)) { - finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); + finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { splits.addAll(s.outputSink.splits); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 67789995caf..fffe94bbf05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -907,8 +907,8 @@ public abstract class AbstractTestWALReplay { FileStatus[] listStatus = wal.getFiles(); assertNotNull(listStatus); assertTrue(listStatus.length > 0); - WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null, wals); + WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, + wals, null); FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { @@ -1058,8 +1058,8 @@ public abstract class AbstractTestWALReplay { first = fs.getFileStatus(smallFile); second = fs.getFileStatus(largeFile); } - WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals); - WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals); + WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null); + WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null); WAL wal = createWAL(this.conf, hbaseRootDir, logName); region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 80b42c64b31..84f101f254b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -173,7 +173,7 @@ public class TestWALReaderOnSecureWAL { FileStatus[] listStatus = fs.listStatus(walPath.getParent()); Path rootdir = FSUtils.getRootDir(conf); try { - WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null); + WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); @@ -217,7 +217,7 @@ public class TestWALReaderOnSecureWAL { FileStatus[] listStatus = fs.listStatus(walPath.getParent()); Path rootdir = FSUtils.getRootDir(conf); try { - WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null); + WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 33d54771b91..b028d34a105 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -812,15 +812,16 @@ public class TestWALSplit { } assertTrue("There should be some log greater than size 0.", 0 < largestSize); // Set up a splitter that will throw an IOE on the output side - WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { - @Override - protected Writer createWriter(Path logfile) throws IOException { - Writer mockWriter = Mockito.mock(Writer.class); - Mockito.doThrow(new IOException("Injected")).when( - mockWriter).append(Mockito.any()); - return mockWriter; - } - }; + WALSplitter logSplitter = + new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { + @Override + protected Writer createWriter(Path logfile) throws IOException { + Writer mockWriter = Mockito.mock(Writer.class); + Mockito.doThrow(new IOException("Injected")).when(mockWriter) + .append(Mockito. any()); + return mockWriter; + } + }; // Set up a background thread dumper. Needs a thread to depend on and then we need to run // the thread dumping in a background thread so it does not hold up the test. final AtomicBoolean stop = new AtomicBoolean(false); @@ -941,8 +942,8 @@ public class TestWALSplit { try { conf.setInt("hbase.splitlog.report.period", 1000); - boolean ret = WALSplitter.splitLogFile( - HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals); + boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, + null, wals, null); assertFalse("Log splitting should failed", ret); assertTrue(count.get() > 0); } catch (IOException e) { @@ -1000,7 +1001,8 @@ public class TestWALSplit { makeRegionDirs(regions); // Create a splitter that reads and writes the data without touching disk - WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) { + WALSplitter logSplitter = + new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { /* Produce a mock writer that doesn't write anywhere */ @Override @@ -1148,7 +1150,8 @@ public class TestWALSplit { assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); - WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { + WALSplitter logSplitter = + new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { @Override protected Writer createWriter(Path logfile) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java index 2e60a0eda60..eaf6b6021f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.junit.After; import org.junit.AfterClass; @@ -172,8 +173,9 @@ public class TestWALSplitToHFile { final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); final TableDescriptor td = createBasic3FamilyTD(tableName); final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); - final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); - deleteDir(basedir); + final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName); + deleteDir(tableDir); + FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); final byte[] rowName = tableName.getName(); final int countPerFamily = 10; @@ -203,7 +205,12 @@ public class TestWALSplitToHFile { // all edits in logs are seen as 'stale'/old. region.close(true); wal.shutdown(); - WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + try { + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + } catch (Exception e) { + LOG.debug("Got exception", e); + } + WAL wal2 = createWAL(this.conf, rootDir, logName); HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); long seqid2 = region2.getOpenSeqNum(); @@ -230,7 +237,7 @@ public class TestWALSplitToHFile { FileSystem newFS = FileSystem.get(newConf); // Make a new wal for new region open. WAL wal3 = createWAL(newConf, rootDir, logName); - HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td, null); + HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null); long seqid3 = region3.initialize(); Result result3 = region3.get(g); // Assert that count of cells is same as before crash. @@ -259,11 +266,12 @@ public class TestWALSplitToHFile { throws IOException, SecurityException, IllegalArgumentException { final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); - final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); - deleteDir(basedir); + final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName); + deleteDir(tableDir); final byte[] rowName = tableName.getName(); final int countPerFamily = 10; final TableDescriptor td = createBasic3FamilyTD(tableName); + FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); HBaseTestingUtility.closeRegionAndWAL(region3); // Write countPerFamily edits into the three families. Do a flush on one @@ -318,9 +326,10 @@ public class TestWALSplitToHFile { public void testReplayEditsAfterAbortingFlush() throws IOException { final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); - final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); - deleteDir(basedir); + final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName); + deleteDir(tableDir); final TableDescriptor td = createBasic3FamilyTD(tableName); + FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); HBaseTestingUtility.closeRegionAndWAL(region3); // Write countPerFamily edits into the three families. Do a flush on one