HBASE-23286 Improve MTTR: Split WAL to HFile (#820)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
@ -52,11 +52,19 @@ public final class HConstants {
/** Used as a magic return value while optimized index key feature enabled(HBASE-7845) */
public final static int INDEX_KEY_MAGIC = -2;
* Name of directory that holds recovered edits written by the wal log
* splitting code, one per region
* Name of directory that holds recovered edits written by the wal log
* splitting code, one per region
public static final String RECOVERED_EDITS_DIR = "recovered.edits";
* Name of directory that holds recovered hfiles written by the wal log
* splitting code, one per region
public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";
* The first four bytes of Hadoop RPC connections
@ -52,7 +52,7 @@ public class CellSet implements NavigableSet<Cell> {
private final int numUniqueKeys;
CellSet(final CellComparator c) {
public CellSet(final CellComparator c) {
this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator());
this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
@ -975,6 +975,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
// Recover any hfiles if available
maxSeqId = Math.max(maxSeqId, loadRecoveredHFilesIfAny(stores));
// Make sure mvcc is up to max.
} finally {
@ -5332,6 +5334,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws IOException {
Path regionDir = getWALRegionDir();
long maxSeqId = -1;
for (HStore store : stores) {
String familyName = store.getColumnFamilyName();
FileStatus[] files =
WALSplitUtil.getRecoveredHFiles(fs.getFileSystem(), regionDir, familyName);
if (files != null && files.length != 0) {
for (FileStatus file : files) {
Pair<Path, Path> pair = store.preBulkLoadHFile(file.getPath().toString(), -1);
store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(),
maxSeqId =
Math.max(maxSeqId, WALSplitUtil.getSeqIdForRecoveredHFile(file.getPath().getName()));
if (this.rsServices != null && store.needsCompaction()) {
.requestCompaction(this, store, "load recovered hfiles request compaction",
Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY, null);
return maxSeqId;
* Be careful, this method will drop all data in the memstore of this region.
* Currently, this method is used to drop memstore to prevent memory leak
@ -328,7 +328,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
public Map<byte[], Long> getOutputCounts() {
public Map<String, Long> getOutputCounts() {
return null; // only used in tests
@ -1031,7 +1031,12 @@ public abstract class FSUtils extends CommonFSUtils {
public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
return getRegionDirFromTableDir(tableDir,
public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
return new Path(tableDir, encodedRegionName);
@ -23,11 +23,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -45,7 +47,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
// Since the splitting process may create multiple output files, we need a map
// to track the output count of each region.
private ConcurrentHashMap<byte[], Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
// Need a counter to track the opening writers.
private final AtomicInteger openingWritersNum = new AtomicInteger(0);
@ -68,7 +70,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
if (writer != null) {
(k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
List<IOException> thrown = new ArrayList<>();
Path dst = closeRecoveredEditsWriter(writer, thrown);
@ -125,7 +127,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
public Map<byte[], Long> getOutputCounts() {
public Map<String, Long> getOutputCounts() {
return regionEditsWrittenMap;
@ -0,0 +1,240 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.CellSet;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BoundedRecoveredHFilesOutputSink extends OutputSink {
private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class);
public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
private final WALSplitter walSplitter;
private final Map<TableName, TableDescriptor> tableDescCache;
private Connection connection;
private Admin admin;
private FileSystem rootFS;
// Since the splitting process may create multiple output files, we need a map
// to track the output count of each region.
private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
// Need a counter to track the opening writers.
private final AtomicInteger openingWritersNum = new AtomicInteger(0);
public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter,
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
super(controller, entryBuffers, numWriters);
this.walSplitter = walSplitter;
tableDescCache = new HashMap<>();
public void startWriterThreads() throws IOException {
connection = ConnectionFactory.createConnection(walSplitter.conf);
admin = connection.getAdmin();
rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf);
public void append(RegionEntryBuffer buffer) throws IOException {
Map<String, CellSet> familyCells = new HashMap<>();
Map<String, Long> familySeqIds = new HashMap<>();
boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME);
for (WAL.Entry entry : buffer.entries) {
long seqId = entry.getKey().getSequenceId();
List<Cell> cells = entry.getEdit().getCells();
for (Cell cell : cells) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
// comparator need to be specified for meta
familyCells.computeIfAbsent(familyName, key -> new CellSet(
isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparator.getInstance()))
familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId));
// The key point is create a new writer for each column family, write edits then close writer.
String regionName = Bytes.toString(buffer.encodedRegionName);
for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) {
String familyName = cellsEntry.getKey();
StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName,
familySeqIds.get(familyName), familyName, isMetaTable);
try {
for (Cell cell : cellsEntry.getValue()) {
(k, v) -> v == null ? buffer.entries.size() : v + buffer.entries.size());
} finally {
public List<Path> close() throws IOException {
boolean isSuccessful = true;
try {
isSuccessful &= finishWriterThreads(false);
} finally {
isSuccessful &= writeRemainingEntryBuffers();
return isSuccessful ? splits : null;
* Write out the remaining RegionEntryBuffers and close the writers.
* @return true when there is no error.
private boolean writeRemainingEntryBuffers() throws IOException {
for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) {
closeCompletionService.submit(() -> {
return null;
boolean progressFailed = false;
try {
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
Future<Void> future = closeCompletionService.take();
if (!progressFailed && reporter != null && !reporter.progress()) {
progressFailed = true;
} catch (InterruptedException e) {
IOException iie = new InterruptedIOException();
throw iie;
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
return !progressFailed;
public Map<String, Long> getOutputCounts() {
return regionEditsWrittenMap;
public int getNumberOfRecoveredRegions() {
return regionEditsWrittenMap.size();
public int getNumOpenWriters() {
return openingWritersNum.get();
public boolean keepRegionEvent(Entry entry) {
return false;
private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
long seqId, String familyName, boolean isMetaTable) throws IOException {
Path outputFile = WALSplitUtil
.getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, rootFS);
StoreFileWriter.Builder writerBuilder =
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, rootFS)
HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
if (isMetaTable) {
} else {
configContextForNonMetaWriter(tableName, familyName, hFileContextBuilder, writerBuilder);
return writerBuilder.withFileContext(hFileContextBuilder.build()).build();
private void configContextForNonMetaWriter(TableName tableName, String familyName,
HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder)
throws IOException {
if (!tableDescCache.containsKey(tableName)) {
tableDescCache.put(tableName, admin.getDescriptor(tableName));
TableDescriptor tableDesc = tableDescCache.get(tableName);
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
private void checkPathValid(Path outputFile) throws IOException {
if (rootFS.exists(outputFile)) {
LOG.warn("this file {} may be left after last failed split ", outputFile);
if (!rootFS.delete(outputFile, false)) {
LOG.warn("delete old generated HFile {} failed", outputFile);
@ -81,7 +81,7 @@ public abstract class OutputSink {
* Start the threads that will pump data from the entryBuffers to the output files.
public synchronized void startWriterThreads() {
public void startWriterThreads() throws IOException {
for (int i = 0; i < numThreads; i++) {
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
@ -142,7 +142,7 @@ public abstract class OutputSink {
* @return a map from encoded region ID to the number of edits written out for that region.
protected abstract Map<byte[], Long> getOutputCounts();
protected abstract Map<String, Long> getOutputCounts();
* @return number of regions we've recovered
@ -135,10 +135,10 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
public Map<String, Long> getOutputCounts() {
TreeMap<String, Long> ret = new TreeMap<>();
for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
ret.put(entry.getKey(), entry.getValue().editsWritten);
return ret;
@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -164,7 +165,7 @@ public final class WALSplitUtil {
* RECOVERED_EDITS_DIR under the region creating it if necessary.
* @param tableName the table name
* @param encodedRegionName the encoded region name
* @param sedId the sequence id which used to generate file name
* @param seqId the sequence id which used to generate file name
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
* @param tmpDirName of the directory used to sideline old recovered edits file
* @param conf configuration
@ -173,7 +174,7 @@ public final class WALSplitUtil {
static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long sedId,
static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId,
String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
FileSystem walFS = FSUtils.getWALFileSystem(conf);
Path tableDir = FSUtils.getWALTableDir(conf, tableName);
@ -202,7 +203,7 @@ public final class WALSplitUtil {
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
String fileName = formatRecoveredEditsFileName(sedId);
String fileName = formatRecoveredEditsFileName(seqId);
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
return new Path(dir, fileName);
@ -562,4 +563,58 @@ public final class WALSplitUtil {
return mutations;
* Path to a file under recovered.hfiles directory of the region's column family: e.g.
* /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method also ensures existence
* of recovered.hfiles directory under the region's column family, creating it if necessary.
* @param tableName the table name
* @param encodedRegionName the encoded region name
* @param familyName the column family name
* @param seqId the sequence id which used to generate file name
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
* @param conf configuration
* @param rootFS the root file system
* @return Path to file into which to dump split log edits.
static Path getRegionRecoveredHFilePath(TableName tableName, String encodedRegionName,
String familyName, long seqId, String fileNameBeingSplit, Configuration conf, FileSystem rootFS)
throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path regionDir =
FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName), encodedRegionName);
Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit);
return new Path(dir, fileName);
private static String formatRecoveredHFileName(long seqId, String fileNameBeingSplit) {
return String.format("%019d", seqId) + "-" + fileNameBeingSplit;
public static long getSeqIdForRecoveredHFile(String fileName) {
return Long.parseLong(fileName.split("-")[0]);
* @param regionDir This regions directory in the filesystem
* @param familyName The column family name
* @return The directory that holds recovered hfiles for the region's column family
private static Path getStoreDirRecoveredHFilesDir(final Path regionDir, String familyName) {
return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
final Path regionDir, String familyName) throws IOException {
Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
return FSUtils.listStatus(rootFS, dir);
@ -17,6 +17,8 @@
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
import java.io.EOFException;
@ -111,7 +113,7 @@ public class WALSplitter {
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName =
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@ -128,16 +130,21 @@ public class WALSplitter {
// if we limit the number of writers opened for sinking recovered edits
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
if (splitWriterCreationBounded) {
if (splitToHFile) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
} else if (splitWriterCreationBounded) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
} else {
entryBuffers = new EntryBuffers(controller, bufferSize);
outputSink =
new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
@ -642,7 +642,7 @@ public abstract class AbstractTestWALReplay {
// Only throws exception if throwExceptionWhenFlushing is set true.
public static class CustomStoreFlusher extends DefaultStoreFlusher {
// Switch between throw and not throw exception in flush
static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
public CustomStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
@ -1173,7 +1173,7 @@ public abstract class AbstractTestWALReplay {
static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
List<Put> puts = new ArrayList<>();
for (int j = 0; j < count; j++) {
@ -1064,11 +1064,10 @@ public class TestWALSplit {
logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
// Verify number of written edits per region
Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
LOG.info("Got " + entry.getValue() + " output edits for region " +
assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
@ -0,0 +1,408 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ RegionServerTests.class, LargeTests.class })
public class TestWALSplitToHFile {
public static final HBaseClassTestRule CLASS_RULE =
private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
private Path rootDir = null;
private String logName;
private Path oldLogDir;
private Path logDir;
private FileSystem fs;
private Configuration conf;
private WALFactory wals;
public final TestName TEST_NAME = new TestName();
public static void setUpBeforeClass() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
LOG.info("hbase.rootdir=" + hbaseRootDir);
FSUtils.setRootDir(conf, hbaseRootDir);
public static void tearDownAfterClass() throws Exception {
public void setUp() throws Exception {
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
this.fs = UTIL.getDFSCluster().getFileSystem();
this.rootDir = FSUtils.getRootDir(this.conf);
this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String serverName =
ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
this.logDir = new Path(this.rootDir, logName);
if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
public void tearDown() throws Exception {
UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
* @param p Directory to cleanup
private void deleteDir(final Path p) throws IOException {
if (this.fs.exists(p)) {
if (!this.fs.delete(p, true)) {
throw new IOException("Failed remove of " + p);
private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
TableDescriptor td = builder.build();
return td;
private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
return wal;
* Test writing edits into an HRegion, closing it, splitting logs, opening
* Region again. Verify seqids.
public void testReplayEditsWrittenViaHRegion()
throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final TableDescriptor td = createBasic3FamilyTD(tableName);
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
// Write countPerFamily edits into the three families. Do a flush on one
// of the families during the load of edits so its seqid is not same as
// others to test we do right thing when different seqids.
WAL wal = createWAL(this.conf, rootDir, logName);
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
long seqid = region.getOpenSeqNum();
boolean first = true;
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
if (first) {
// If first, so we have at least one family w/ different seqid to rest.
first = false;
// Now assert edits made it in.
final Get g = new Get(rowName);
Result result = region.get(g);
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
// Now close the region (without flush), split the log, reopen the region and assert that
// replay of log has the correct effect, that our seqids are calculated correctly so
// all edits in logs are seen as 'stale'/old.
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
WAL wal2 = createWAL(this.conf, rootDir, logName);
HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
long seqid2 = region2.getOpenSeqNum();
assertTrue(seqid + result.size() < seqid2);
final Result result1b = region2.get(g);
assertEquals(result.size(), result1b.size());
// Next test. Add more edits, then 'crash' this region by stealing its wal
// out from under it and assert that replay of the log adds the edits back
// correctly when region is opened again.
for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
// Get count of edits.
final Result result2 = region2.get(g);
assertEquals(2 * result.size(), result2.size());
final Configuration newConf = HBaseConfiguration.create(this.conf);
User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
user.runAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
FileSystem newFS = FileSystem.get(newConf);
// Make a new wal for new region open.
WAL wal3 = createWAL(newConf, rootDir, logName);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td, null);
long seqid3 = region3.initialize();
Result result3 = region3.get(g);
// Assert that count of cells is same as before crash.
assertEquals(result2.size(), result3.size());
// I can't close wal1. Its been appropriated when we split.
return null;
* Test that we recover correctly when there is a failure in between the
* flushes. i.e. Some stores got flushed but others did not.
* Unfortunately, there is no easy hook to flush at a store level. The way
* we get around this is by flushing at the region level, and then deleting
* the recently flushed store file for one of the Stores. This would put us
* back in the situation where all but that store got flushed and the region
* died.
* We restart Region again, and verify that the edits were replayed.
public void testReplayEditsAfterPartialFlush()
throws IOException, SecurityException, IllegalArgumentException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
final TableDescriptor td = createBasic3FamilyTD(tableName);
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
// Write countPerFamily edits into the three families. Do a flush on one
// of the families during the load of edits so its seqid is not same as
// others to test we do right thing when different seqids.
WAL wal = createWAL(this.conf, rootDir, logName);
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
long seqid = region.getOpenSeqNum();
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
// Now assert edits made it in.
final Get g = new Get(rowName);
Result result = region.get(g);
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
// Let us flush the region
// delete the store files in the second column family to simulate a failure
// in between the flushcache();
// we have 3 families. killing the middle one ensures that taking the maximum
// will make us fail.
int cf_count = 0;
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
if (cf_count == 2) {
// Let us try to split and recover
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
WAL wal2 = createWAL(this.conf, rootDir, logName);
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
long seqid2 = region2.getOpenSeqNum();
assertTrue(seqid + result.size() < seqid2);
final Result result1b = region2.get(g);
assertEquals(result.size(), result1b.size());
* Test that we could recover the data correctly after aborting flush. In the
* test, first we abort flush after writing some data, then writing more data
* and flush again, at last verify the data.
public void testReplayEditsAfterAbortingFlush() throws IOException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
final TableDescriptor td = createBasic3FamilyTD(tableName);
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
// Write countPerFamily edits into the three families. Do a flush on one
// of the families during the load of edits so its seqid is not same as
// others to test we do right thing when different seqids.
WAL wal = createWAL(this.conf, rootDir, logName);
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
Configuration customConf = new Configuration(this.conf);
HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
int writtenRowCount = 10;
List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
for (int i = 0; i < writtenRowCount; i++) {
Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
// Now assert edits made it in.
RegionScanner scanner = region.getScanner(new Scan());
assertEquals(writtenRowCount, getScannedCount(scanner));
// Let us flush the region
try {
fail("Injected exception hasn't been thrown");
} catch (IOException e) {
LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
// simulated to abort server
region.setClosing(false); // region normally does not accept writes after
// DroppedSnapshotException. We mock around it for this test.
// writing more data
int moreRow = 10;
for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
writtenRowCount += moreRow;
// call flush again
try {
} catch (IOException t) {
"Expected exception when flushing region because server is stopped," + t.getMessage());
// Let us try to split and recover
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
WAL wal2 = createWAL(this.conf, rootDir, logName);
HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
scanner = region2.getScanner(new Scan());
assertEquals(writtenRowCount, getScannedCount(scanner));
private int getScannedCount(RegionScanner scanner) throws IOException {
int scannedCount = 0;
List<Cell> results = new ArrayList<>();
while (true) {
boolean existMore = scanner.next(results);
if (!results.isEmpty()) {
if (!existMore) {
return scannedCount;
Reference in New Issue