HBASE-23286 Improve MTTR: Split WAL to HFile (#820)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
94346d8623
commit
def9ac7c45
|
@ -51,11 +51,19 @@ public final class HConstants {
|
|||
|
||||
/** Used as a magic return value while optimized index key feature enabled(HBASE-7845) */
|
||||
public final static int INDEX_KEY_MAGIC = -2;
|
||||
|
||||
/*
|
||||
* Name of directory that holds recovered edits written by the wal log
|
||||
* splitting code, one per region
|
||||
*/
|
||||
public static final String RECOVERED_EDITS_DIR = "recovered.edits";
|
||||
|
||||
/*
|
||||
* Name of directory that holds recovered hfiles written by the wal log
|
||||
* splitting code, one per region
|
||||
*/
|
||||
public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";
|
||||
|
||||
/**
|
||||
* The first four bytes of Hadoop RPC connections
|
||||
*/
|
||||
|
|
|
@ -52,7 +52,7 @@ public class CellSet implements NavigableSet<Cell> {
|
|||
|
||||
private final int numUniqueKeys;
|
||||
|
||||
CellSet(final CellComparator c) {
|
||||
public CellSet(final CellComparator c) {
|
||||
this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator());
|
||||
this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
|
||||
}
|
||||
|
|
|
@ -979,6 +979,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Recover any edits if available.
|
||||
maxSeqId = Math.max(maxSeqId,
|
||||
replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
|
||||
// Recover any hfiles if available
|
||||
maxSeqId = Math.max(maxSeqId, loadRecoveredHFilesIfAny(stores));
|
||||
// Make sure mvcc is up to max.
|
||||
this.mvcc.advanceTo(maxSeqId);
|
||||
} finally {
|
||||
|
@ -5375,6 +5377,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws IOException {
|
||||
Path regionDir = getWALRegionDir();
|
||||
long maxSeqId = -1;
|
||||
for (HStore store : stores) {
|
||||
String familyName = store.getColumnFamilyName();
|
||||
FileStatus[] files =
|
||||
WALSplitUtil.getRecoveredHFiles(fs.getFileSystem(), regionDir, familyName);
|
||||
if (files != null && files.length != 0) {
|
||||
for (FileStatus file : files) {
|
||||
store.assertBulkLoadHFileOk(file.getPath());
|
||||
Pair<Path, Path> pair = store.preBulkLoadHFile(file.getPath().toString(), -1);
|
||||
store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(),
|
||||
pair.getSecond());
|
||||
maxSeqId =
|
||||
Math.max(maxSeqId, WALSplitUtil.getSeqIdForRecoveredHFile(file.getPath().getName()));
|
||||
}
|
||||
if (this.rsServices != null && store.needsCompaction()) {
|
||||
this.rsServices.getCompactionRequestor()
|
||||
.requestCompaction(this, store, "load recovered hfiles request compaction",
|
||||
Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return maxSeqId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Be careful, this method will drop all data in the memstore of this region.
|
||||
* Currently, this method is used to drop memstore to prevent memory leak
|
||||
|
|
|
@ -1045,7 +1045,12 @@ public abstract class FSUtils extends CommonFSUtils {
|
|||
}
|
||||
|
||||
public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
|
||||
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
|
||||
return getRegionDirFromTableDir(tableDir,
|
||||
ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
|
||||
}
|
||||
|
||||
public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
|
||||
return new Path(tableDir, encodedRegionName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,11 +23,13 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -45,7 +47,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
|||
|
||||
// Since the splitting process may create multiple output files, we need a map
|
||||
// to track the output count of each region.
|
||||
private ConcurrentHashMap<byte[], Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
|
||||
private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
|
||||
// Need a counter to track the opening writers.
|
||||
private final AtomicInteger openingWritersNum = new AtomicInteger(0);
|
||||
|
||||
|
@ -68,7 +70,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
|||
if (writer != null) {
|
||||
openingWritersNum.incrementAndGet();
|
||||
writer.writeRegionEntries(entries);
|
||||
regionEditsWrittenMap.compute(buffer.encodedRegionName,
|
||||
regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
|
||||
(k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
|
||||
List<IOException> thrown = new ArrayList<>();
|
||||
Path dst = closeRecoveredEditsWriter(writer, thrown);
|
||||
|
@ -125,7 +127,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<byte[], Long> getOutputCounts() {
|
||||
public Map<String, Long> getOutputCounts() {
|
||||
return regionEditsWrittenMap;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,240 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSet;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class);
|
||||
|
||||
public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
|
||||
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
|
||||
|
||||
private final WALSplitter walSplitter;
|
||||
private final Map<TableName, TableDescriptor> tableDescCache;
|
||||
private Connection connection;
|
||||
private Admin admin;
|
||||
private FileSystem rootFS;
|
||||
|
||||
// Since the splitting process may create multiple output files, we need a map
|
||||
// to track the output count of each region.
|
||||
private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
|
||||
// Need a counter to track the opening writers.
|
||||
private final AtomicInteger openingWritersNum = new AtomicInteger(0);
|
||||
|
||||
public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter,
|
||||
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
||||
super(controller, entryBuffers, numWriters);
|
||||
this.walSplitter = walSplitter;
|
||||
tableDescCache = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
void startWriterThreads() throws IOException {
|
||||
connection = ConnectionFactory.createConnection(walSplitter.conf);
|
||||
admin = connection.getAdmin();
|
||||
rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf);
|
||||
super.startWriterThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
void append(RegionEntryBuffer buffer) throws IOException {
|
||||
Map<String, CellSet> familyCells = new HashMap<>();
|
||||
Map<String, Long> familySeqIds = new HashMap<>();
|
||||
boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME);
|
||||
for (WAL.Entry entry : buffer.entryBuffer) {
|
||||
long seqId = entry.getKey().getSequenceId();
|
||||
List<Cell> cells = entry.getEdit().getCells();
|
||||
for (Cell cell : cells) {
|
||||
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
|
||||
continue;
|
||||
}
|
||||
String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
|
||||
// comparator need to be specified for meta
|
||||
familyCells.computeIfAbsent(familyName, key -> new CellSet(
|
||||
isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparator.getInstance()))
|
||||
.add(cell);
|
||||
familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId));
|
||||
}
|
||||
}
|
||||
|
||||
// The key point is create a new writer for each column family, write edits then close writer.
|
||||
String regionName = Bytes.toString(buffer.encodedRegionName);
|
||||
for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) {
|
||||
String familyName = cellsEntry.getKey();
|
||||
StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName,
|
||||
familySeqIds.get(familyName), familyName, isMetaTable);
|
||||
openingWritersNum.incrementAndGet();
|
||||
try {
|
||||
for (Cell cell : cellsEntry.getValue()) {
|
||||
writer.append(cell);
|
||||
}
|
||||
regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
|
||||
(k, v) -> v == null ? buffer.entryBuffer.size() : v + buffer.entryBuffer.size());
|
||||
splits.add(writer.getPath());
|
||||
openingWritersNum.decrementAndGet();
|
||||
} finally {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> close() throws IOException {
|
||||
boolean isSuccessful = true;
|
||||
try {
|
||||
isSuccessful &= finishWriterThreads();
|
||||
} finally {
|
||||
isSuccessful &= writeRemainingEntryBuffers();
|
||||
}
|
||||
IOUtils.closeQuietly(admin);
|
||||
IOUtils.closeQuietly(connection);
|
||||
return isSuccessful ? splits : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write out the remaining RegionEntryBuffers and close the writers.
|
||||
*
|
||||
* @return true when there is no error.
|
||||
*/
|
||||
private boolean writeRemainingEntryBuffers() throws IOException {
|
||||
for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) {
|
||||
closeCompletionService.submit(() -> {
|
||||
append(buffer);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
boolean progressFailed = false;
|
||||
try {
|
||||
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
|
||||
Future<Void> future = closeCompletionService.take();
|
||||
future.get();
|
||||
if (!progressFailed && reporter != null && !reporter.progress()) {
|
||||
progressFailed = true;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
IOException iie = new InterruptedIOException();
|
||||
iie.initCause(e);
|
||||
throw iie;
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} finally {
|
||||
closeThreadPool.shutdownNow();
|
||||
}
|
||||
return !progressFailed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getOutputCounts() {
|
||||
return regionEditsWrittenMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfRecoveredRegions() {
|
||||
return regionEditsWrittenMap.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
int getNumOpenWriters() {
|
||||
return openingWritersNum.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean keepRegionEvent(Entry entry) {
|
||||
return false;
|
||||
}
|
||||
|
||||
private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
|
||||
long seqId, String familyName, boolean isMetaTable) throws IOException {
|
||||
Path outputFile = WALSplitUtil
|
||||
.getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
|
||||
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, rootFS);
|
||||
checkPathValid(outputFile);
|
||||
StoreFileWriter.Builder writerBuilder =
|
||||
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, rootFS)
|
||||
.withFilePath(outputFile);
|
||||
HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
|
||||
if (isMetaTable) {
|
||||
writerBuilder.withComparator(CellComparatorImpl.META_COMPARATOR);
|
||||
} else {
|
||||
configContextForNonMetaWriter(tableName, familyName, hFileContextBuilder, writerBuilder);
|
||||
}
|
||||
return writerBuilder.withFileContext(hFileContextBuilder.build()).build();
|
||||
}
|
||||
|
||||
private void configContextForNonMetaWriter(TableName tableName, String familyName,
|
||||
HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder)
|
||||
throws IOException {
|
||||
if (!tableDescCache.containsKey(tableName)) {
|
||||
tableDescCache.put(tableName, admin.getDescriptor(tableName));
|
||||
}
|
||||
TableDescriptor tableDesc = tableDescCache.get(tableName);
|
||||
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
|
||||
hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize())
|
||||
.withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding());
|
||||
writerBuilder.withBloomType(cfd.getBloomFilterType())
|
||||
.withComparator(CellComparatorImpl.COMPARATOR);
|
||||
}
|
||||
|
||||
private void checkPathValid(Path outputFile) throws IOException {
|
||||
if (rootFS.exists(outputFile)) {
|
||||
LOG.warn("this file {} may be left after last failed split ", outputFile);
|
||||
if (!rootFS.delete(outputFile, false)) {
|
||||
LOG.warn("delete old generated HFile {} failed", outputFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -81,7 +81,7 @@ abstract class OutputSink {
|
|||
/**
|
||||
* Start the threads that will pump data from the entryBuffers to the output files.
|
||||
*/
|
||||
synchronized void startWriterThreads() {
|
||||
void startWriterThreads() throws IOException {
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
|
||||
t.start();
|
||||
|
@ -137,7 +137,7 @@ abstract class OutputSink {
|
|||
/**
|
||||
* @return a map from encoded region ID to the number of edits written out for that region.
|
||||
*/
|
||||
abstract Map<byte[], Long> getOutputCounts();
|
||||
abstract Map<String, Long> getOutputCounts();
|
||||
|
||||
/**
|
||||
* @return number of regions we've recovered
|
||||
|
|
|
@ -135,10 +135,10 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<byte[], Long> getOutputCounts() {
|
||||
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
public Map<String, Long> getOutputCounts() {
|
||||
TreeMap<String, Long> ret = new TreeMap<>();
|
||||
for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
|
||||
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
|
||||
ret.put(entry.getKey(), entry.getValue().editsWritten);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.TreeSet;
|
|||
import java.util.UUID;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
|
@ -165,7 +166,7 @@ public final class WALSplitUtil {
|
|||
* RECOVERED_EDITS_DIR under the region creating it if necessary.
|
||||
* @param tableName the table name
|
||||
* @param encodedRegionName the encoded region name
|
||||
* @param sedId the sequence id which used to generate file name
|
||||
* @param seqId the sequence id which used to generate file name
|
||||
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
|
||||
* @param tmpDirName of the directory used to sideline old recovered edits file
|
||||
* @param conf configuration
|
||||
|
@ -174,7 +175,7 @@ public final class WALSplitUtil {
|
|||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@VisibleForTesting
|
||||
static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long sedId,
|
||||
static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId,
|
||||
String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
|
||||
FileSystem walFS = FSUtils.getWALFileSystem(conf);
|
||||
Path tableDir = FSUtils.getWALTableDir(conf, tableName);
|
||||
|
@ -203,7 +204,7 @@ public final class WALSplitUtil {
|
|||
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
|
||||
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
|
||||
// region's replayRecoveredEdits will not delete it
|
||||
String fileName = formatRecoveredEditsFileName(sedId);
|
||||
String fileName = formatRecoveredEditsFileName(seqId);
|
||||
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
|
||||
return new Path(dir, fileName);
|
||||
}
|
||||
|
@ -563,4 +564,58 @@ public final class WALSplitUtil {
|
|||
|
||||
return mutations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Path to a file under recovered.hfiles directory of the region's column family: e.g.
|
||||
* /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method also ensures existence
|
||||
* of recovered.hfiles directory under the region's column family, creating it if necessary.
|
||||
*
|
||||
* @param tableName the table name
|
||||
* @param encodedRegionName the encoded region name
|
||||
* @param familyName the column family name
|
||||
* @param seqId the sequence id which used to generate file name
|
||||
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
|
||||
* @param conf configuration
|
||||
* @param rootFS the root file system
|
||||
* @return Path to file into which to dump split log edits.
|
||||
*/
|
||||
static Path getRegionRecoveredHFilePath(TableName tableName, String encodedRegionName,
|
||||
String familyName, long seqId, String fileNameBeingSplit, Configuration conf, FileSystem rootFS)
|
||||
throws IOException {
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
Path regionDir =
|
||||
FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName), encodedRegionName);
|
||||
Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
|
||||
|
||||
if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
|
||||
LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
|
||||
familyName);
|
||||
}
|
||||
|
||||
String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit);
|
||||
return new Path(dir, fileName);
|
||||
}
|
||||
|
||||
private static String formatRecoveredHFileName(long seqId, String fileNameBeingSplit) {
|
||||
return String.format("%019d", seqId) + "-" + fileNameBeingSplit;
|
||||
}
|
||||
|
||||
public static long getSeqIdForRecoveredHFile(String fileName) {
|
||||
return Long.parseLong(fileName.split("-")[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param regionDir This regions directory in the filesystem
|
||||
* @param familyName The column family name
|
||||
* @return The directory that holds recovered hfiles for the region's column family
|
||||
*/
|
||||
private static Path getStoreDirRecoveredHFilesDir(final Path regionDir, String familyName) {
|
||||
return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
|
||||
}
|
||||
|
||||
public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
|
||||
final Path regionDir, String familyName) throws IOException {
|
||||
Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
|
||||
return FSUtils.listStatus(rootFS, dir);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
|
||||
import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
|
||||
|
||||
import java.io.EOFException;
|
||||
|
@ -129,16 +131,21 @@ public class WALSplitter {
|
|||
|
||||
// if we limit the number of writers opened for sinking recovered edits
|
||||
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
|
||||
boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
|
||||
long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
|
||||
int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
|
||||
if (splitWriterCreationBounded) {
|
||||
|
||||
if (splitToHFile) {
|
||||
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
|
||||
outputSink =
|
||||
new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
|
||||
} else if (splitWriterCreationBounded) {
|
||||
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
|
||||
outputSink =
|
||||
new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
|
||||
} else {
|
||||
entryBuffers = new EntryBuffers(controller, bufferSize);
|
||||
outputSink =
|
||||
new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
|
||||
outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -642,7 +642,7 @@ public abstract class AbstractTestWALReplay {
|
|||
// Only throws exception if throwExceptionWhenFlushing is set true.
|
||||
public static class CustomStoreFlusher extends DefaultStoreFlusher {
|
||||
// Switch between throw and not throw exception in flush
|
||||
static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
|
||||
public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
|
||||
|
||||
public CustomStoreFlusher(Configuration conf, HStore store) {
|
||||
super(conf, store);
|
||||
|
@ -1173,7 +1173,7 @@ public abstract class AbstractTestWALReplay {
|
|||
wal.sync();
|
||||
}
|
||||
|
||||
static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
|
||||
public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
|
||||
EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (int j = 0; j < count; j++) {
|
||||
|
|
|
@ -1064,11 +1064,10 @@ public class TestWALSplit {
|
|||
logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
|
||||
|
||||
// Verify number of written edits per region
|
||||
Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
|
||||
for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
|
||||
LOG.info("Got " + entry.getValue() + " output edits for region " +
|
||||
Bytes.toString(entry.getKey()));
|
||||
assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
|
||||
Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
|
||||
for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
|
||||
LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
|
||||
assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
|
||||
}
|
||||
assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,408 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ RegionServerTests.class, LargeTests.class })
|
||||
public class TestWALSplitToHFile {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
|
||||
static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
private Path rootDir = null;
|
||||
private String logName;
|
||||
private Path oldLogDir;
|
||||
private Path logDir;
|
||||
private FileSystem fs;
|
||||
private Configuration conf;
|
||||
private WALFactory wals;
|
||||
|
||||
@Rule
|
||||
public final TestName TEST_NAME = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
|
||||
UTIL.startMiniCluster(3);
|
||||
Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
|
||||
LOG.info("hbase.rootdir=" + hbaseRootDir);
|
||||
FSUtils.setRootDir(conf, hbaseRootDir);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
|
||||
this.fs = UTIL.getDFSCluster().getFileSystem();
|
||||
this.rootDir = FSUtils.getRootDir(this.conf);
|
||||
this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
String serverName =
|
||||
ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
|
||||
.toString();
|
||||
this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
|
||||
this.logDir = new Path(this.rootDir, logName);
|
||||
if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
|
||||
UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
|
||||
}
|
||||
this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
this.wals.close();
|
||||
UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param p Directory to cleanup
|
||||
*/
|
||||
private void deleteDir(final Path p) throws IOException {
|
||||
if (this.fs.exists(p)) {
|
||||
if (!this.fs.delete(p, true)) {
|
||||
throw new IOException("Failed remove of " + p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
|
||||
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
|
||||
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
|
||||
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
|
||||
TableDescriptor td = builder.build();
|
||||
UTIL.getAdmin().createTable(td);
|
||||
return td;
|
||||
}
|
||||
|
||||
private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
|
||||
FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
|
||||
wal.init();
|
||||
return wal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test writing edits into an HRegion, closing it, splitting logs, opening
|
||||
* Region again. Verify seqids.
|
||||
*/
|
||||
@Test
|
||||
public void testReplayEditsWrittenViaHRegion()
|
||||
throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
|
||||
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
||||
final TableDescriptor td = createBasic3FamilyTD(tableName);
|
||||
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final byte[] rowName = tableName.getName();
|
||||
final int countPerFamily = 10;
|
||||
|
||||
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
|
||||
HBaseTestingUtility.closeRegionAndWAL(region3);
|
||||
// Write countPerFamily edits into the three families. Do a flush on one
|
||||
// of the families during the load of edits so its seqid is not same as
|
||||
// others to test we do right thing when different seqids.
|
||||
WAL wal = createWAL(this.conf, rootDir, logName);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
|
||||
long seqid = region.getOpenSeqNum();
|
||||
boolean first = true;
|
||||
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||
addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
|
||||
if (first) {
|
||||
// If first, so we have at least one family w/ different seqid to rest.
|
||||
region.flush(true);
|
||||
first = false;
|
||||
}
|
||||
}
|
||||
// Now assert edits made it in.
|
||||
final Get g = new Get(rowName);
|
||||
Result result = region.get(g);
|
||||
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
|
||||
// Now close the region (without flush), split the log, reopen the region and assert that
|
||||
// replay of log has the correct effect, that our seqids are calculated correctly so
|
||||
// all edits in logs are seen as 'stale'/old.
|
||||
region.close(true);
|
||||
wal.shutdown();
|
||||
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
|
||||
WAL wal2 = createWAL(this.conf, rootDir, logName);
|
||||
HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid + result.size() < seqid2);
|
||||
final Result result1b = region2.get(g);
|
||||
assertEquals(result.size(), result1b.size());
|
||||
|
||||
// Next test. Add more edits, then 'crash' this region by stealing its wal
|
||||
// out from under it and assert that replay of the log adds the edits back
|
||||
// correctly when region is opened again.
|
||||
for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
|
||||
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
|
||||
}
|
||||
// Get count of edits.
|
||||
final Result result2 = region2.get(g);
|
||||
assertEquals(2 * result.size(), result2.size());
|
||||
wal2.sync();
|
||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||
User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
|
||||
user.runAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
|
||||
FileSystem newFS = FileSystem.get(newConf);
|
||||
// Make a new wal for new region open.
|
||||
WAL wal3 = createWAL(newConf, rootDir, logName);
|
||||
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td, null);
|
||||
long seqid3 = region3.initialize();
|
||||
Result result3 = region3.get(g);
|
||||
// Assert that count of cells is same as before crash.
|
||||
assertEquals(result2.size(), result3.size());
|
||||
|
||||
// I can't close wal1. Its been appropriated when we split.
|
||||
region3.close();
|
||||
wal3.close();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that we recover correctly when there is a failure in between the
|
||||
* flushes. i.e. Some stores got flushed but others did not.
|
||||
* Unfortunately, there is no easy hook to flush at a store level. The way
|
||||
* we get around this is by flushing at the region level, and then deleting
|
||||
* the recently flushed store file for one of the Stores. This would put us
|
||||
* back in the situation where all but that store got flushed and the region
|
||||
* died.
|
||||
* We restart Region again, and verify that the edits were replayed.
|
||||
*/
|
||||
@Test
|
||||
public void testReplayEditsAfterPartialFlush()
|
||||
throws IOException, SecurityException, IllegalArgumentException {
|
||||
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
||||
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final byte[] rowName = tableName.getName();
|
||||
final int countPerFamily = 10;
|
||||
final TableDescriptor td = createBasic3FamilyTD(tableName);
|
||||
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
|
||||
HBaseTestingUtility.closeRegionAndWAL(region3);
|
||||
// Write countPerFamily edits into the three families. Do a flush on one
|
||||
// of the families during the load of edits so its seqid is not same as
|
||||
// others to test we do right thing when different seqids.
|
||||
WAL wal = createWAL(this.conf, rootDir, logName);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
|
||||
long seqid = region.getOpenSeqNum();
|
||||
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||
addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
|
||||
}
|
||||
|
||||
// Now assert edits made it in.
|
||||
final Get g = new Get(rowName);
|
||||
Result result = region.get(g);
|
||||
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
|
||||
|
||||
// Let us flush the region
|
||||
region.flush(true);
|
||||
region.close(true);
|
||||
wal.shutdown();
|
||||
|
||||
// delete the store files in the second column family to simulate a failure
|
||||
// in between the flushcache();
|
||||
// we have 3 families. killing the middle one ensures that taking the maximum
|
||||
// will make us fail.
|
||||
int cf_count = 0;
|
||||
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||
cf_count++;
|
||||
if (cf_count == 2) {
|
||||
region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
|
||||
}
|
||||
}
|
||||
|
||||
// Let us try to split and recover
|
||||
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
|
||||
WAL wal2 = createWAL(this.conf, rootDir, logName);
|
||||
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid + result.size() < seqid2);
|
||||
|
||||
final Result result1b = region2.get(g);
|
||||
assertEquals(result.size(), result1b.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that we could recover the data correctly after aborting flush. In the
|
||||
* test, first we abort flush after writing some data, then writing more data
|
||||
* and flush again, at last verify the data.
|
||||
*/
|
||||
@Test
|
||||
public void testReplayEditsAfterAbortingFlush() throws IOException {
|
||||
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
||||
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final TableDescriptor td = createBasic3FamilyTD(tableName);
|
||||
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
|
||||
HBaseTestingUtility.closeRegionAndWAL(region3);
|
||||
// Write countPerFamily edits into the three families. Do a flush on one
|
||||
// of the families during the load of edits so its seqid is not same as
|
||||
// others to test we do right thing when different seqids.
|
||||
WAL wal = createWAL(this.conf, rootDir, logName);
|
||||
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
|
||||
Mockito.doReturn(false).when(rsServices).isAborted();
|
||||
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
|
||||
when(rsServices.getConfiguration()).thenReturn(conf);
|
||||
Configuration customConf = new Configuration(this.conf);
|
||||
customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
|
||||
AbstractTestWALReplay.CustomStoreFlusher.class.getName());
|
||||
HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
|
||||
int writtenRowCount = 10;
|
||||
List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
|
||||
for (int i = 0; i < writtenRowCount; i++) {
|
||||
Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
|
||||
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
||||
Bytes.toBytes("val"));
|
||||
region.put(put);
|
||||
}
|
||||
|
||||
// Now assert edits made it in.
|
||||
RegionScanner scanner = region.getScanner(new Scan());
|
||||
assertEquals(writtenRowCount, getScannedCount(scanner));
|
||||
|
||||
// Let us flush the region
|
||||
AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
|
||||
try {
|
||||
region.flush(true);
|
||||
fail("Injected exception hasn't been thrown");
|
||||
} catch (IOException e) {
|
||||
LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
|
||||
// simulated to abort server
|
||||
Mockito.doReturn(true).when(rsServices).isAborted();
|
||||
region.setClosing(false); // region normally does not accept writes after
|
||||
// DroppedSnapshotException. We mock around it for this test.
|
||||
}
|
||||
// writing more data
|
||||
int moreRow = 10;
|
||||
for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
|
||||
Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
|
||||
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
||||
Bytes.toBytes("val"));
|
||||
region.put(put);
|
||||
}
|
||||
writtenRowCount += moreRow;
|
||||
// call flush again
|
||||
AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
|
||||
try {
|
||||
region.flush(true);
|
||||
} catch (IOException t) {
|
||||
LOG.info(
|
||||
"Expected exception when flushing region because server is stopped," + t.getMessage());
|
||||
}
|
||||
|
||||
region.close(true);
|
||||
wal.shutdown();
|
||||
|
||||
// Let us try to split and recover
|
||||
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
|
||||
WAL wal2 = createWAL(this.conf, rootDir, logName);
|
||||
Mockito.doReturn(false).when(rsServices).isAborted();
|
||||
HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
|
||||
scanner = region2.getScanner(new Scan());
|
||||
assertEquals(writtenRowCount, getScannedCount(scanner));
|
||||
}
|
||||
|
||||
private int getScannedCount(RegionScanner scanner) throws IOException {
|
||||
int scannedCount = 0;
|
||||
List<Cell> results = new ArrayList<>();
|
||||
while (true) {
|
||||
boolean existMore = scanner.next(results);
|
||||
if (!results.isEmpty()) {
|
||||
scannedCount++;
|
||||
}
|
||||
if (!existMore) {
|
||||
break;
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
return scannedCount;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue