From a693a8fd9581a4196143beaa4a8e596d6a081f31 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 27 Dec 2019 15:59:23 +0800 Subject: [PATCH] HBASE-23286 Improve MTTR: Split WAL to HFile (#820) Signed-off-by: Duo Zhang --- .../org/apache/hadoop/hbase/HConstants.java | 14 +- .../hadoop/hbase/regionserver/CellSet.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 28 ++ .../RegionReplicaReplicationEndpoint.java | 2 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 7 +- .../wal/BoundedRecoveredEditsOutputSink.java | 8 +- .../wal/BoundedRecoveredHFilesOutputSink.java | 240 +++++++++++ .../apache/hadoop/hbase/wal/OutputSink.java | 4 +- .../hbase/wal/RecoveredEditsOutputSink.java | 6 +- .../apache/hadoop/hbase/wal/WALSplitUtil.java | 61 ++- .../apache/hadoop/hbase/wal/WALSplitter.java | 15 +- .../wal/AbstractTestWALReplay.java | 4 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 9 +- .../hadoop/hbase/wal/TestWALSplitToHFile.java | 408 ++++++++++++++++++ 14 files changed, 780 insertions(+), 28 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 64692278a17..cb493020bf4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -52,11 +52,19 @@ public final class HConstants { /** Used as a magic return value while optimized index key feature enabled(HBASE-7845) */ public final static int INDEX_KEY_MAGIC = -2; + /* - * Name of directory that holds recovered edits written by the wal log - * splitting code, one per region - */ + * Name of directory that holds recovered edits written by the wal log + * splitting code, one per region + */ public static final String RECOVERED_EDITS_DIR = "recovered.edits"; + + /* + * Name of directory that holds recovered hfiles written by the wal log + * splitting code, one per region + */ + public static final String RECOVERED_HFILES_DIR = "recovered.hfiles"; + /** * The first four bytes of Hadoop RPC connections */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java index 5190d7b1d97..d74655d1b7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -52,7 +52,7 @@ public class CellSet implements NavigableSet { private final int numUniqueKeys; - CellSet(final CellComparator c) { + public CellSet(final CellComparator c) { this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator()); this.numUniqueKeys = UNKNOWN_NUM_UNIQUES; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 067ce2b51bb..a55e2fe069c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -975,6 +975,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status)); + // Recover any hfiles if available + maxSeqId = Math.max(maxSeqId, loadRecoveredHFilesIfAny(stores)); // Make sure mvcc is up to max. this.mvcc.advanceTo(maxSeqId); } finally { @@ -5332,6 +5334,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private long loadRecoveredHFilesIfAny(Collection stores) throws IOException { + Path regionDir = getWALRegionDir(); + long maxSeqId = -1; + for (HStore store : stores) { + String familyName = store.getColumnFamilyName(); + FileStatus[] files = + WALSplitUtil.getRecoveredHFiles(fs.getFileSystem(), regionDir, familyName); + if (files != null && files.length != 0) { + for (FileStatus file : files) { + store.assertBulkLoadHFileOk(file.getPath()); + Pair pair = store.preBulkLoadHFile(file.getPath().toString(), -1); + store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(), + pair.getSecond()); + maxSeqId = + Math.max(maxSeqId, WALSplitUtil.getSeqIdForRecoveredHFile(file.getPath().getName())); + } + if (this.rsServices != null && store.needsCompaction()) { + this.rsServices.getCompactionRequestor() + .requestCompaction(this, store, "load recovered hfiles request compaction", + Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY, null); + } + } + } + return maxSeqId; + } + /** * Be careful, this method will drop all data in the memstore of this region. * Currently, this method is used to drop memstore to prevent memory leak diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 707057fe4a0..60f693ae279 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -328,7 +328,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } @Override - public Map getOutputCounts() { + public Map getOutputCounts() { return null; // only used in tests } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 3d6dbab8b2a..ca1f9383cf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1031,7 +1031,12 @@ public abstract class FSUtils extends CommonFSUtils { } public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { - return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); + return getRegionDirFromTableDir(tableDir, + ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); + } + + public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { + return new Path(tableDir, encodedRegionName); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java index a25820543f4..0ed7c20b889 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java @@ -23,11 +23,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.MultipleIOException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -45,7 +47,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { // Since the splitting process may create multiple output files, we need a map // to track the output count of each region. - private ConcurrentHashMap regionEditsWrittenMap = new ConcurrentHashMap<>(); + private ConcurrentMap regionEditsWrittenMap = new ConcurrentHashMap<>(); // Need a counter to track the opening writers. private final AtomicInteger openingWritersNum = new AtomicInteger(0); @@ -68,7 +70,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { if (writer != null) { openingWritersNum.incrementAndGet(); writer.writeRegionEntries(entries); - regionEditsWrittenMap.compute(buffer.encodedRegionName, + regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten); List thrown = new ArrayList<>(); Path dst = closeRecoveredEditsWriter(writer, thrown); @@ -125,7 +127,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { } @Override - public Map getOutputCounts() { + public Map getOutputCounts() { return regionEditsWrittenMap; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java new file mode 100644 index 00000000000..be21ec67082 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.CellSet; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class BoundedRecoveredHFilesOutputSink extends OutputSink { + private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class); + + public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile"; + public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; + + private final WALSplitter walSplitter; + private final Map tableDescCache; + private Connection connection; + private Admin admin; + private FileSystem rootFS; + + // Since the splitting process may create multiple output files, we need a map + // to track the output count of each region. + private ConcurrentMap regionEditsWrittenMap = new ConcurrentHashMap<>(); + // Need a counter to track the opening writers. + private final AtomicInteger openingWritersNum = new AtomicInteger(0); + + public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, + WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { + super(controller, entryBuffers, numWriters); + this.walSplitter = walSplitter; + tableDescCache = new HashMap<>(); + } + + @Override + public void startWriterThreads() throws IOException { + connection = ConnectionFactory.createConnection(walSplitter.conf); + admin = connection.getAdmin(); + rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf); + super.startWriterThreads(); + } + + @Override + public void append(RegionEntryBuffer buffer) throws IOException { + Map familyCells = new HashMap<>(); + Map familySeqIds = new HashMap<>(); + boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME); + for (WAL.Entry entry : buffer.entries) { + long seqId = entry.getKey().getSequenceId(); + List cells = entry.getEdit().getCells(); + for (Cell cell : cells) { + if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + continue; + } + String familyName = Bytes.toString(CellUtil.cloneFamily(cell)); + // comparator need to be specified for meta + familyCells.computeIfAbsent(familyName, key -> new CellSet( + isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparator.getInstance())) + .add(cell); + familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId)); + } + } + + // The key point is create a new writer for each column family, write edits then close writer. + String regionName = Bytes.toString(buffer.encodedRegionName); + for (Map.Entry cellsEntry : familyCells.entrySet()) { + String familyName = cellsEntry.getKey(); + StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName, + familySeqIds.get(familyName), familyName, isMetaTable); + openingWritersNum.incrementAndGet(); + try { + for (Cell cell : cellsEntry.getValue()) { + writer.append(cell); + } + regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), + (k, v) -> v == null ? buffer.entries.size() : v + buffer.entries.size()); + splits.add(writer.getPath()); + openingWritersNum.decrementAndGet(); + } finally { + writer.close(); + } + } + } + + @Override + public List close() throws IOException { + boolean isSuccessful = true; + try { + isSuccessful &= finishWriterThreads(false); + } finally { + isSuccessful &= writeRemainingEntryBuffers(); + } + IOUtils.closeQuietly(admin); + IOUtils.closeQuietly(connection); + return isSuccessful ? splits : null; + } + + /** + * Write out the remaining RegionEntryBuffers and close the writers. + * + * @return true when there is no error. + */ + private boolean writeRemainingEntryBuffers() throws IOException { + for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { + closeCompletionService.submit(() -> { + append(buffer); + return null; + }); + } + boolean progressFailed = false; + try { + for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { + Future future = closeCompletionService.take(); + future.get(); + if (!progressFailed && reporter != null && !reporter.progress()) { + progressFailed = true; + } + } + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + closeThreadPool.shutdownNow(); + } + return !progressFailed; + } + + @Override + public Map getOutputCounts() { + return regionEditsWrittenMap; + } + + @Override + public int getNumberOfRecoveredRegions() { + return regionEditsWrittenMap.size(); + } + + @Override + public int getNumOpenWriters() { + return openingWritersNum.get(); + } + + @Override + public boolean keepRegionEvent(Entry entry) { + return false; + } + + private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName, + long seqId, String familyName, boolean isMetaTable) throws IOException { + Path outputFile = WALSplitUtil + .getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId, + walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, rootFS); + checkPathValid(outputFile); + StoreFileWriter.Builder writerBuilder = + new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, rootFS) + .withFilePath(outputFile); + HFileContextBuilder hFileContextBuilder = new HFileContextBuilder(); + if (isMetaTable) { + writerBuilder.withComparator(CellComparatorImpl.META_COMPARATOR); + } else { + configContextForNonMetaWriter(tableName, familyName, hFileContextBuilder, writerBuilder); + } + return writerBuilder.withFileContext(hFileContextBuilder.build()).build(); + } + + private void configContextForNonMetaWriter(TableName tableName, String familyName, + HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder) + throws IOException { + if (!tableDescCache.containsKey(tableName)) { + tableDescCache.put(tableName, admin.getDescriptor(tableName)); + } + TableDescriptor tableDesc = tableDescCache.get(tableName); + ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName)); + hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize()) + .withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding()); + writerBuilder.withBloomType(cfd.getBloomFilterType()) + .withComparator(CellComparatorImpl.COMPARATOR); + } + + private void checkPathValid(Path outputFile) throws IOException { + if (rootFS.exists(outputFile)) { + LOG.warn("this file {} may be left after last failed split ", outputFile); + if (!rootFS.delete(outputFile, false)) { + LOG.warn("delete old generated HFile {} failed", outputFile); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java index b60b8894e23..b58913489b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java @@ -81,7 +81,7 @@ public abstract class OutputSink { /** * Start the threads that will pump data from the entryBuffers to the output files. */ - public synchronized void startWriterThreads() { + public void startWriterThreads() throws IOException { for (int i = 0; i < numThreads; i++) { WriterThread t = new WriterThread(controller, entryBuffers, this, i); t.start(); @@ -142,7 +142,7 @@ public abstract class OutputSink { /** * @return a map from encoded region ID to the number of edits written out for that region. */ - protected abstract Map getOutputCounts(); + protected abstract Map getOutputCounts(); /** * @return number of regions we've recovered diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java index 67b6e6759a2..c5cfd31d4bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java @@ -135,10 +135,10 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { } @Override - public Map getOutputCounts() { - TreeMap ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + public Map getOutputCounts() { + TreeMap ret = new TreeMap<>(); for (Map.Entry entry : writers.entrySet()) { - ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); + ret.put(entry.getKey(), entry.getValue().editsWritten); } return ret; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java index d68549f32b7..026eb25bedc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -28,6 +28,7 @@ import java.util.TreeSet; import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; + import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -164,7 +165,7 @@ public final class WALSplitUtil { * RECOVERED_EDITS_DIR under the region creating it if necessary. * @param tableName the table name * @param encodedRegionName the encoded region name - * @param sedId the sequence id which used to generate file name + * @param seqId the sequence id which used to generate file name * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. * @param tmpDirName of the directory used to sideline old recovered edits file * @param conf configuration @@ -173,7 +174,7 @@ public final class WALSplitUtil { */ @SuppressWarnings("deprecation") @VisibleForTesting - static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long sedId, + static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId, String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { FileSystem walFS = FSUtils.getWALFileSystem(conf); Path tableDir = FSUtils.getWALTableDir(conf, tableName); @@ -202,7 +203,7 @@ public final class WALSplitUtil { // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure // region's replayRecoveredEdits will not delete it - String fileName = formatRecoveredEditsFileName(sedId); + String fileName = formatRecoveredEditsFileName(seqId); fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); return new Path(dir, fileName); } @@ -562,4 +563,58 @@ public final class WALSplitUtil { return mutations; } + + /** + * Path to a file under recovered.hfiles directory of the region's column family: e.g. + * /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method also ensures existence + * of recovered.hfiles directory under the region's column family, creating it if necessary. + * + * @param tableName the table name + * @param encodedRegionName the encoded region name + * @param familyName the column family name + * @param seqId the sequence id which used to generate file name + * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name + * @param conf configuration + * @param rootFS the root file system + * @return Path to file into which to dump split log edits. + */ + static Path getRegionRecoveredHFilePath(TableName tableName, String encodedRegionName, + String familyName, long seqId, String fileNameBeingSplit, Configuration conf, FileSystem rootFS) + throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + Path regionDir = + FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName), encodedRegionName); + Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName); + + if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) { + LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName, + familyName); + } + + String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit); + return new Path(dir, fileName); + } + + private static String formatRecoveredHFileName(long seqId, String fileNameBeingSplit) { + return String.format("%019d", seqId) + "-" + fileNameBeingSplit; + } + + public static long getSeqIdForRecoveredHFile(String fileName) { + return Long.parseLong(fileName.split("-")[0]); + } + + /** + * @param regionDir This regions directory in the filesystem + * @param familyName The column family name + * @return The directory that holds recovered hfiles for the region's column family + */ + private static Path getStoreDirRecoveredHFilesDir(final Path regionDir, String familyName) { + return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR); + } + + public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, + final Path regionDir, String familyName) throws IOException { + Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName); + return FSUtils.listStatus(rootFS, dir); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index c60c8828bd6..d7bbd072c37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE; +import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE; import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile; import java.io.EOFException; @@ -111,7 +113,7 @@ public class WALSplitter { @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, - LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { + LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); @@ -128,16 +130,21 @@ public class WALSplitter { // if we limit the number of writers opened for sinking recovered edits boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); + boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); - if (splitWriterCreationBounded) { + + if (splitToHFile) { + entryBuffers = new BoundedEntryBuffers(controller, bufferSize); + outputSink = + new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads); + } else if (splitWriterCreationBounded) { entryBuffers = new BoundedEntryBuffers(controller, bufferSize); outputSink = new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); } else { entryBuffers = new EntryBuffers(controller, bufferSize); - outputSink = - new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); + outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index d05fcce7ee9..4f2d482f351 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -642,7 +642,7 @@ public abstract class AbstractTestWALReplay { // Only throws exception if throwExceptionWhenFlushing is set true. public static class CustomStoreFlusher extends DefaultStoreFlusher { // Switch between throw and not throw exception in flush - static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); + public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); public CustomStoreFlusher(Configuration conf, HStore store) { super(conf, store); @@ -1173,7 +1173,7 @@ public abstract class AbstractTestWALReplay { wal.sync(); } - static List addRegionEdits(final byte[] rowName, final byte[] family, final int count, + public static List addRegionEdits(final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { List puts = new ArrayList<>(); for (int j = 0; j < count; j++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 8ddd0ea96a3..33d54771b91 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -1064,11 +1064,10 @@ public class TestWALSplit { logSplitter.splitLogFile(fs.getFileStatus(logPath), null); // Verify number of written edits per region - Map outputCounts = logSplitter.outputSink.getOutputCounts(); - for (Map.Entry entry : outputCounts.entrySet()) { - LOG.info("Got " + entry.getValue() + " output edits for region " + - Bytes.toString(entry.getKey())); - assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); + Map outputCounts = logSplitter.outputSink.getOutputCounts(); + for (Map.Entry entry : outputCounts.entrySet()) { + LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey()); + assertEquals((long) entry.getValue(), numFakeEdits / regions.size()); } assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java new file mode 100644 index 00000000000..e1ea0368ecb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java @@ -0,0 +1,408 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; +import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestWALSplitToHFile { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALSplitToHFile.class); + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); + static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); + private Path rootDir = null; + private String logName; + private Path oldLogDir; + private Path logDir; + private FileSystem fs; + private Configuration conf; + private WALFactory wals; + + @Rule + public final TestName TEST_NAME = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setBoolean(WAL_SPLIT_TO_HFILE, true); + UTIL.startMiniCluster(3); + Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); + LOG.info("hbase.rootdir=" + hbaseRootDir); + FSUtils.setRootDir(conf, hbaseRootDir); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); + this.fs = UTIL.getDFSCluster().getFileSystem(); + this.rootDir = FSUtils.getRootDir(this.conf); + this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + String serverName = + ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis()) + .toString(); + this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); + this.logDir = new Path(this.rootDir, logName); + if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) { + UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); + } + this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); + } + + @After + public void tearDown() throws Exception { + this.wals.close(); + UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); + } + + /* + * @param p Directory to cleanup + */ + private void deleteDir(final Path p) throws IOException { + if (this.fs.exists(p)) { + if (!this.fs.delete(p, true)) { + throw new IOException("Failed remove of " + p); + } + } + } + + private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build()); + builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build()); + builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build()); + TableDescriptor td = builder.build(); + UTIL.getAdmin().createTable(td); + return td; + } + + private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { + FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); + wal.init(); + return wal; + } + + /** + * Test writing edits into an HRegion, closing it, splitting logs, opening + * Region again. Verify seqids. + */ + @Test + public void testReplayEditsWrittenViaHRegion() + throws IOException, SecurityException, IllegalArgumentException, InterruptedException { + final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + final TableDescriptor td = createBasic3FamilyTD(tableName); + final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); + final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); + deleteDir(basedir); + final byte[] rowName = tableName.getName(); + final int countPerFamily = 10; + + HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); + HBaseTestingUtility.closeRegionAndWAL(region3); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + WAL wal = createWAL(this.conf, rootDir, logName); + HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); + long seqid = region.getOpenSeqNum(); + boolean first = true; + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x"); + if (first) { + // If first, so we have at least one family w/ different seqid to rest. + region.flush(true); + first = false; + } + } + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); + // Now close the region (without flush), split the log, reopen the region and assert that + // replay of log has the correct effect, that our seqids are calculated correctly so + // all edits in logs are seen as 'stale'/old. + region.close(true); + wal.shutdown(); + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + WAL wal2 = createWAL(this.conf, rootDir, logName); + HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); + long seqid2 = region2.getOpenSeqNum(); + assertTrue(seqid + result.size() < seqid2); + final Result result1b = region2.get(g); + assertEquals(result.size(), result1b.size()); + + // Next test. Add more edits, then 'crash' this region by stealing its wal + // out from under it and assert that replay of the log adds the edits back + // correctly when region is opened again. + for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) { + addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); + } + // Get count of edits. + final Result result2 = region2.get(g); + assertEquals(2 * result.size(), result2.size()); + wal2.sync(); + final Configuration newConf = HBaseConfiguration.create(this.conf); + User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString()); + user.runAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals); + FileSystem newFS = FileSystem.get(newConf); + // Make a new wal for new region open. + WAL wal3 = createWAL(newConf, rootDir, logName); + HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td, null); + long seqid3 = region3.initialize(); + Result result3 = region3.get(g); + // Assert that count of cells is same as before crash. + assertEquals(result2.size(), result3.size()); + + // I can't close wal1. Its been appropriated when we split. + region3.close(); + wal3.close(); + return null; + } + }); + } + + /** + * Test that we recover correctly when there is a failure in between the + * flushes. i.e. Some stores got flushed but others did not. + * Unfortunately, there is no easy hook to flush at a store level. The way + * we get around this is by flushing at the region level, and then deleting + * the recently flushed store file for one of the Stores. This would put us + * back in the situation where all but that store got flushed and the region + * died. + * We restart Region again, and verify that the edits were replayed. + */ + @Test + public void testReplayEditsAfterPartialFlush() + throws IOException, SecurityException, IllegalArgumentException { + final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); + final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); + deleteDir(basedir); + final byte[] rowName = tableName.getName(); + final int countPerFamily = 10; + final TableDescriptor td = createBasic3FamilyTD(tableName); + HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); + HBaseTestingUtility.closeRegionAndWAL(region3); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + WAL wal = createWAL(this.conf, rootDir, logName); + HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); + long seqid = region.getOpenSeqNum(); + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x"); + } + + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); + + // Let us flush the region + region.flush(true); + region.close(true); + wal.shutdown(); + + // delete the store files in the second column family to simulate a failure + // in between the flushcache(); + // we have 3 families. killing the middle one ensures that taking the maximum + // will make us fail. + int cf_count = 0; + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + cf_count++; + if (cf_count == 2) { + region.getRegionFileSystem().deleteFamily(cfd.getNameAsString()); + } + } + + // Let us try to split and recover + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + WAL wal2 = createWAL(this.conf, rootDir, logName); + HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); + long seqid2 = region2.getOpenSeqNum(); + assertTrue(seqid + result.size() < seqid2); + + final Result result1b = region2.get(g); + assertEquals(result.size(), result1b.size()); + } + + /** + * Test that we could recover the data correctly after aborting flush. In the + * test, first we abort flush after writing some data, then writing more data + * and flush again, at last verify the data. + */ + @Test + public void testReplayEditsAfterAbortingFlush() throws IOException { + final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); + final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); + deleteDir(basedir); + final TableDescriptor td = createBasic3FamilyTD(tableName); + HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); + HBaseTestingUtility.closeRegionAndWAL(region3); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + WAL wal = createWAL(this.conf, rootDir, logName); + RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); + Mockito.doReturn(false).when(rsServices).isAborted(); + when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); + when(rsServices.getConfiguration()).thenReturn(conf); + Configuration customConf = new Configuration(this.conf); + customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, + AbstractTestWALReplay.CustomStoreFlusher.class.getName()); + HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null); + int writtenRowCount = 10; + List families = Arrays.asList(td.getColumnFamilies()); + for (int i = 0; i < writtenRowCount; i++) { + Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); + put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + + // Now assert edits made it in. + RegionScanner scanner = region.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + + // Let us flush the region + AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true); + try { + region.flush(true); + fail("Injected exception hasn't been thrown"); + } catch (IOException e) { + LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); + // simulated to abort server + Mockito.doReturn(true).when(rsServices).isAborted(); + region.setClosing(false); // region normally does not accept writes after + // DroppedSnapshotException. We mock around it for this test. + } + // writing more data + int moreRow = 10; + for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { + Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); + put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + writtenRowCount += moreRow; + // call flush again + AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false); + try { + region.flush(true); + } catch (IOException t) { + LOG.info( + "Expected exception when flushing region because server is stopped," + t.getMessage()); + } + + region.close(true); + wal.shutdown(); + + // Let us try to split and recover + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + WAL wal2 = createWAL(this.conf, rootDir, logName); + Mockito.doReturn(false).when(rsServices).isAborted(); + HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null); + scanner = region2.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + } + + private int getScannedCount(RegionScanner scanner) throws IOException { + int scannedCount = 0; + List results = new ArrayList<>(); + while (true) { + boolean existMore = scanner.next(results); + if (!results.isEmpty()) { + scannedCount++; + } + if (!existMore) { + break; + } + results.clear(); + } + return scannedCount; + } +}