HBASE-10958 [dataloss] Bulk loading with seqids can prevent some log entries from being replayed
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1590144 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e1a767105d
commit
dcae5488cd
|
@ -387,6 +387,81 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Objects from this class are created when flushing to describe all the different states that
|
||||
* that method ends up in. The Result enum describes those states. The sequence id should only
|
||||
* be specified if the flush was successful, and the failure message should only be speficied
|
||||
* if it didn't flush.
|
||||
*/
|
||||
public static class FlushResult {
|
||||
enum Result {
|
||||
FLUSHED_NO_COMPACTION_NEEDED,
|
||||
FLUSHED_COMPACTION_NEEDED,
|
||||
// Special case where a flush didn't run because there's nothing in the memstores. Used when
|
||||
// bulk loading to know when we can still load even if a flush didn't happen.
|
||||
CANNOT_FLUSH_MEMSTORE_EMPTY,
|
||||
CANNOT_FLUSH
|
||||
// Be careful adding more to this enum, look at the below methods to make sure
|
||||
}
|
||||
|
||||
final Result result;
|
||||
final String failureReason;
|
||||
final long flushSequenceId;
|
||||
|
||||
/**
|
||||
* Convenience constructor to use when the flush is successful, the failure message is set to
|
||||
* null.
|
||||
* @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
|
||||
* @param flushSequenceId Generated sequence id that comes right after the edits in the
|
||||
* memstores.
|
||||
*/
|
||||
FlushResult(Result result, long flushSequenceId) {
|
||||
this(result, flushSequenceId, null);
|
||||
assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
|
||||
.FLUSHED_COMPACTION_NEEDED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor to use when we cannot flush.
|
||||
* @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
|
||||
* @param failureReason Reason why we couldn't flush.
|
||||
*/
|
||||
FlushResult(Result result, String failureReason) {
|
||||
this(result, -1, failureReason);
|
||||
assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with all the parameters.
|
||||
* @param result Any of the Result.
|
||||
* @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
|
||||
* @param failureReason Reason why we couldn't flush, or null.
|
||||
*/
|
||||
FlushResult(Result result, long flushSequenceId, String failureReason) {
|
||||
this.result = result;
|
||||
this.flushSequenceId = flushSequenceId;
|
||||
this.failureReason = failureReason;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method, the equivalent of checking if result is
|
||||
* FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
|
||||
* @return true if the memstores were flushed, else false.
|
||||
*/
|
||||
public boolean isFlushSucceeded() {
|
||||
return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
|
||||
.FLUSHED_COMPACTION_NEEDED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
|
||||
* @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
|
||||
*/
|
||||
public boolean isCompactionNeeded() {
|
||||
return result == Result.FLUSHED_COMPACTION_NEEDED;
|
||||
}
|
||||
}
|
||||
|
||||
final WriteState writestate = new WriteState();
|
||||
|
||||
long memstoreFlushSize;
|
||||
|
@ -706,16 +781,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
|
||||
Future<HStore> future = completionService.take();
|
||||
HStore store = future.get();
|
||||
|
||||
this.stores.put(store.getColumnFamilyName().getBytes(), store);
|
||||
// Do not include bulk loaded files when determining seqIdForReplay
|
||||
long storeSeqIdForReplay = store.getMaxSequenceId(false);
|
||||
|
||||
long storeMaxSequenceId = store.getMaxSequenceId();
|
||||
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
||||
storeSeqIdForReplay);
|
||||
// Include bulk loaded files when determining seqIdForAssignment
|
||||
long storeSeqIdForAssignment = store.getMaxSequenceId(true);
|
||||
if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
|
||||
maxSeqId = storeSeqIdForAssignment;
|
||||
storeMaxSequenceId);
|
||||
if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
|
||||
maxSeqId = storeMaxSequenceId;
|
||||
}
|
||||
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
|
||||
if (maxStoreMemstoreTS > maxMemstoreTS) {
|
||||
|
@ -1461,11 +1533,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @throws DroppedSnapshotException Thrown when replay of hlog is required
|
||||
* because a Snapshot was not properly persisted.
|
||||
*/
|
||||
public boolean flushcache() throws IOException {
|
||||
public FlushResult flushcache() throws IOException {
|
||||
// fail-fast instead of waiting on the lock
|
||||
if (this.closing.get()) {
|
||||
LOG.debug("Skipping flush on " + this + " because closing");
|
||||
return false;
|
||||
String msg = "Skipping flush on " + this + " because closing";
|
||||
LOG.debug(msg);
|
||||
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
|
||||
}
|
||||
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
|
||||
status.setStatus("Acquiring readlock on region");
|
||||
|
@ -1473,9 +1546,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
lock.readLock().lock();
|
||||
try {
|
||||
if (this.closed.get()) {
|
||||
LOG.debug("Skipping flush on " + this + " because closed");
|
||||
status.abort("Skipped: closed");
|
||||
return false;
|
||||
String msg = "Skipping flush on " + this + " because closed";
|
||||
LOG.debug(msg);
|
||||
status.abort(msg);
|
||||
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
|
||||
}
|
||||
if (coprocessorHost != null) {
|
||||
status.setStatus("Running coprocessor pre-flush hooks");
|
||||
|
@ -1494,14 +1568,15 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
+ ", flushing=" + writestate.flushing + ", writesEnabled="
|
||||
+ writestate.writesEnabled);
|
||||
}
|
||||
status.abort("Not flushing since "
|
||||
String msg = "Not flushing since "
|
||||
+ (writestate.flushing ? "already flushing"
|
||||
: "writes not enabled"));
|
||||
return false;
|
||||
: "writes not enabled");
|
||||
status.abort(msg);
|
||||
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
|
||||
}
|
||||
}
|
||||
try {
|
||||
boolean result = internalFlushcache(status);
|
||||
FlushResult fs = internalFlushcache(status);
|
||||
|
||||
if (coprocessorHost != null) {
|
||||
status.setStatus("Running post-flush coprocessor hooks");
|
||||
|
@ -1509,7 +1584,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
status.markComplete("Flush successful");
|
||||
return result;
|
||||
return fs;
|
||||
} finally {
|
||||
synchronized (writestate) {
|
||||
writestate.flushing = false;
|
||||
|
@ -1578,13 +1653,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* <p> This method may block for some time.
|
||||
* @param status
|
||||
*
|
||||
* @return true if the region needs compacting
|
||||
* @return object describing the flush's state
|
||||
*
|
||||
* @throws IOException general io exceptions
|
||||
* @throws DroppedSnapshotException Thrown when replay of hlog is required
|
||||
* because a Snapshot was not properly persisted.
|
||||
*/
|
||||
protected boolean internalFlushcache(MonitoredTask status)
|
||||
protected FlushResult internalFlushcache(MonitoredTask status)
|
||||
throws IOException {
|
||||
return internalFlushcache(this.log, -1, status);
|
||||
}
|
||||
|
@ -1598,7 +1673,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @throws IOException
|
||||
* @see #internalFlushcache(MonitoredTask)
|
||||
*/
|
||||
protected boolean internalFlushcache(
|
||||
protected FlushResult internalFlushcache(
|
||||
final HLog wal, final long myseqid, MonitoredTask status)
|
||||
throws IOException {
|
||||
if (this.rsServices != null && this.rsServices.isAborted()) {
|
||||
|
@ -1609,7 +1684,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Clear flush flag.
|
||||
// If nothing to flush, return and avoid logging start/stop flush.
|
||||
if (this.memstoreSize.get() <= 0) {
|
||||
return false;
|
||||
return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started memstore flush for " + this +
|
||||
|
@ -1644,9 +1719,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// check if it is not closing.
|
||||
if (wal != null) {
|
||||
if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
|
||||
status.setStatus("Flush will not be started for ["
|
||||
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.");
|
||||
return false;
|
||||
String msg = "Flush will not be started for ["
|
||||
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
|
||||
status.setStatus(msg);
|
||||
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
|
||||
}
|
||||
flushSeqId = this.sequenceId.incrementAndGet();
|
||||
} else {
|
||||
|
@ -1762,7 +1838,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
status.setStatus(msg);
|
||||
this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
|
||||
|
||||
return compactionRequested;
|
||||
return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
|
||||
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -3563,6 +3640,22 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return false;
|
||||
}
|
||||
|
||||
long seqId = -1;
|
||||
// We need to assign a sequential ID that's in between two memstores in order to preserve
|
||||
// the guarantee that all the edits lower than the highest sequential ID from all the
|
||||
// HFiles are flushed on disk. See HBASE-10958.
|
||||
if (assignSeqId) {
|
||||
FlushResult fs = this.flushcache();
|
||||
if (fs.isFlushSucceeded()) {
|
||||
seqId = fs.flushSequenceId;
|
||||
} else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
|
||||
seqId = this.sequenceId.incrementAndGet();
|
||||
} else {
|
||||
throw new IOException("Could not bulk load with an assigned sequential ID because the " +
|
||||
"flush didn't run. Reason for not flushing: " + fs.failureReason);
|
||||
}
|
||||
}
|
||||
|
||||
for (Pair<byte[], String> p : familyPaths) {
|
||||
byte[] familyName = p.getFirst();
|
||||
String path = p.getSecond();
|
||||
|
@ -3572,7 +3665,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if(bulkLoadListener != null) {
|
||||
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
|
||||
}
|
||||
store.bulkLoadHFile(finalPath, assignSeqId ? this.sequenceId.incrementAndGet() : -1);
|
||||
store.bulkLoadHFile(finalPath, seqId);
|
||||
if(bulkLoadListener != null) {
|
||||
bulkLoadListener.doneBulkLoad(familyName, path);
|
||||
}
|
||||
|
|
|
@ -433,8 +433,8 @@ public class HStore implements Store {
|
|||
/**
|
||||
* @return The maximum sequence id in all store files. Used for log replay.
|
||||
*/
|
||||
long getMaxSequenceId(boolean includeBulkFiles) {
|
||||
return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
|
||||
long getMaxSequenceId() {
|
||||
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -475,7 +475,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
lock.readLock().lock();
|
||||
try {
|
||||
notifyFlushRequest(region, emergencyFlush);
|
||||
boolean shouldCompact = region.flushcache();
|
||||
boolean shouldCompact = region.flushcache().isCompactionNeeded();
|
||||
// We just want to check the size
|
||||
boolean shouldSplit = region.checkSplit() != null;
|
||||
if (shouldSplit) {
|
||||
|
|
|
@ -978,7 +978,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
|
||||
if (shouldFlush) {
|
||||
boolean result = region.flushcache();
|
||||
boolean result = region.flushcache().isCompactionNeeded();
|
||||
if (result) {
|
||||
regionServer.compactSplitThread.requestSystemCompaction(region,
|
||||
"Compaction through user triggered flush");
|
||||
|
|
|
@ -300,22 +300,16 @@ public class StoreFile {
|
|||
|
||||
/**
|
||||
* Return the highest sequence ID found across all storefiles in
|
||||
* the given list. Store files that were created by a mapreduce
|
||||
* bulk load are ignored, as they do not correspond to any edit
|
||||
* log items.
|
||||
* the given list.
|
||||
* @param sfs
|
||||
* @param includeBulkLoadedFiles
|
||||
* @return 0 if no non-bulk-load files are provided or, this is Store that
|
||||
* does not yet have any store files.
|
||||
*/
|
||||
public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
|
||||
boolean includeBulkLoadedFiles) {
|
||||
public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
|
||||
long max = 0;
|
||||
for (StoreFile sf : sfs) {
|
||||
if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
|
||||
max = Math.max(max, sf.getMaxSequenceId());
|
||||
}
|
||||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,21 +32,16 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -69,10 +64,6 @@ public class TestLoadIncrementalHFiles {
|
|||
Bytes.toBytes("ppp")
|
||||
};
|
||||
|
||||
public static int BLOCKSIZE = 64*1024;
|
||||
public static Algorithm COMPRESSION =
|
||||
Compression.Algorithm.NONE;
|
||||
|
||||
static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
|
@ -149,7 +140,7 @@ public class TestLoadIncrementalHFiles {
|
|||
for (byte[][] range : hfileRanges) {
|
||||
byte[] from = range[0];
|
||||
byte[] to = range[1];
|
||||
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
|
||||
}
|
||||
int expectedRows = hfileIdx * 1000;
|
||||
|
@ -189,7 +180,7 @@ public class TestLoadIncrementalHFiles {
|
|||
for (byte[][] range : hFileRanges) {
|
||||
byte[] from = range[0];
|
||||
byte[] to = range[1];
|
||||
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||
+ hFileIdx++), FAMILY, QUALIFIER, from, to, 1000);
|
||||
}
|
||||
|
||||
|
@ -228,7 +219,7 @@ public class TestLoadIncrementalHFiles {
|
|||
FileSystem fs = util.getTestFileSystem();
|
||||
Path testIn = new Path(dir, "testhfile");
|
||||
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
|
||||
createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
|
||||
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
|
||||
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
||||
|
||||
Path bottomOut = new Path(dir, "bottom.out");
|
||||
|
@ -261,40 +252,6 @@ public class TestLoadIncrementalHFiles {
|
|||
return count;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create an HFile with the given number of rows between a given
|
||||
* start key and end key.
|
||||
* TODO put me in an HFileTestUtil or something?
|
||||
*/
|
||||
static void createHFile(
|
||||
Configuration configuration,
|
||||
FileSystem fs, Path path,
|
||||
byte[] family, byte[] qualifier,
|
||||
byte[] startKey, byte[] endKey, int numRows) throws IOException
|
||||
{
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
.withBlockSize(BLOCKSIZE)
|
||||
.withCompression(COMPRESSION)
|
||||
.build();
|
||||
HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
|
||||
.withPath(fs, path)
|
||||
.withFileContext(meta)
|
||||
.create();
|
||||
long now = System.currentTimeMillis();
|
||||
try {
|
||||
// subtract 2 since iterateOnSplits doesn't include boundary keys
|
||||
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
|
||||
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
|
||||
writer.append(kv);
|
||||
}
|
||||
} finally {
|
||||
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
|
||||
Bytes.toBytes(System.currentTimeMillis()));
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
|
||||
Integer value = map.containsKey(first)?map.get(first):0;
|
||||
map.put(first, value+1);
|
||||
|
@ -370,7 +327,7 @@ public class TestLoadIncrementalHFiles {
|
|||
byte[] from = Bytes.toBytes("begin");
|
||||
byte[] to = Bytes.toBytes("end");
|
||||
for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
|
||||
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||
+ i), FAMILY, QUALIFIER, from, to, 1000);
|
||||
}
|
||||
|
||||
|
|
|
@ -3435,7 +3435,7 @@ public class TestHRegion {
|
|||
|
||||
@Override
|
||||
public void doAnAction() throws Exception {
|
||||
if (region.flushcache()) {
|
||||
if (region.flushcache().isCompactionNeeded()) {
|
||||
++flushesSinceCompact;
|
||||
}
|
||||
// Compact regularly to avoid creating too many files and exceeding
|
||||
|
@ -4292,6 +4292,42 @@ public class TestHRegion {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that we get the expected flush results back
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testFlushResult() throws IOException {
|
||||
String method = name.getMethodName();
|
||||
byte[] tableName = Bytes.toBytes(method);
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
|
||||
this.region = initHRegion(tableName, method, family);
|
||||
|
||||
// empty memstore, flush doesn't run
|
||||
HRegion.FlushResult fr = region.flushcache();
|
||||
assertFalse(fr.isFlushSucceeded());
|
||||
assertFalse(fr.isCompactionNeeded());
|
||||
|
||||
// Flush enough files to get up to the threshold, doesn't need compactions
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Put put = new Put(tableName).add(family, family, tableName);
|
||||
region.put(put);
|
||||
fr = region.flushcache();
|
||||
assertTrue(fr.isFlushSucceeded());
|
||||
assertFalse(fr.isCompactionNeeded());
|
||||
}
|
||||
|
||||
// Two flushes after the threshold, compactions are needed
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Put put = new Put(tableName).add(family, family, tableName);
|
||||
region.put(put);
|
||||
fr = region.flushcache();
|
||||
assertTrue(fr.isFlushSucceeded());
|
||||
assertTrue(fr.isCompactionNeeded());
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration initSplit() {
|
||||
// Always compact if there is more than one store file.
|
||||
CONF.setInt("hbase.hstore.compactionThreshold", 2);
|
||||
|
|
|
@ -57,9 +57,6 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
||||
|
@ -77,6 +74,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -319,7 +317,7 @@ public class TestWALReplay {
|
|||
throws IOException, SecurityException, IllegalArgumentException,
|
||||
NoSuchFieldException, IllegalAccessException, InterruptedException {
|
||||
final TableName tableName =
|
||||
TableName.valueOf("testReplayEditsWrittenViaHRegion");
|
||||
TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
|
||||
deleteDir(basedir);
|
||||
|
@ -329,21 +327,22 @@ public class TestWALReplay {
|
|||
HRegion.closeHRegion(region2);
|
||||
HLog wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
|
||||
Path f = new Path(basedir, "hfile");
|
||||
HFileContext context = new HFileContextBuilder().build();
|
||||
HFile.Writer writer =
|
||||
HFile.getWriterFactoryNoCache(conf).withPath(fs, f)
|
||||
.withFileContext(context).create();
|
||||
|
||||
byte [] family = htd.getFamilies().iterator().next().getName();
|
||||
byte [] row = tableName.getName();
|
||||
writer.append(new KeyValue(row, family, family, row));
|
||||
writer.close();
|
||||
Path f = new Path(basedir, "hfile");
|
||||
HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
|
||||
Bytes.toBytes("z"), 10);
|
||||
List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
|
||||
hfs.add(Pair.newPair(family, f.toString()));
|
||||
region.bulkLoadHFiles(hfs, true);
|
||||
|
||||
// Add an edit so something in the WAL
|
||||
byte [] row = tableName.getName();
|
||||
region.put((new Put(row)).add(family, family, family));
|
||||
wal.sync();
|
||||
final int rowsInsertedCount = 11;
|
||||
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
|
||||
|
||||
// Now 'crash' the region by stealing its wal
|
||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||
|
@ -358,6 +357,7 @@ public class TestWALReplay {
|
|||
hbaseRootDir, hri, htd, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid2 > -1);
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
|
||||
|
||||
// I can't close wal1. Its been appropriated when we split.
|
||||
region2.close();
|
||||
|
@ -367,6 +367,78 @@ public class TestWALReplay {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* HRegion test case that is made of a major compacted HFile (created with three bulk loaded
|
||||
* files) and an edit in the memstore.
|
||||
* This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
|
||||
* from being replayed"
|
||||
* @throws IOException
|
||||
* @throws IllegalAccessException
|
||||
* @throws NoSuchFieldException
|
||||
* @throws IllegalArgumentException
|
||||
* @throws SecurityException
|
||||
*/
|
||||
@Test
|
||||
public void testCompactedBulkLoadedFiles()
|
||||
throws IOException, SecurityException, IllegalArgumentException,
|
||||
NoSuchFieldException, IllegalAccessException, InterruptedException {
|
||||
final TableName tableName =
|
||||
TableName.valueOf("testCompactedBulkLoadedFiles");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
|
||||
deleteDir(basedir);
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region2 = HRegion.createHRegion(hri,
|
||||
hbaseRootDir, this.conf, htd);
|
||||
HRegion.closeHRegion(region2);
|
||||
HLog wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
|
||||
|
||||
// Add an edit so something in the WAL
|
||||
byte [] row = tableName.getName();
|
||||
byte [] family = htd.getFamilies().iterator().next().getName();
|
||||
region.put((new Put(row)).add(family, family, family));
|
||||
wal.sync();
|
||||
|
||||
List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Path f = new Path(basedir, "hfile"+i);
|
||||
HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
|
||||
Bytes.toBytes(i + "50"), 10);
|
||||
hfs.add(Pair.newPair(family, f.toString()));
|
||||
}
|
||||
region.bulkLoadHFiles(hfs, true);
|
||||
final int rowsInsertedCount = 31;
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
|
||||
|
||||
// major compact to turn all the bulk loaded files into one normal file
|
||||
region.compactStores(true);
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
|
||||
|
||||
// Now 'crash' the region by stealing its wal
|
||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||
User user = HBaseTestingUtility.getDifferentUser(newConf,
|
||||
tableName.getNameAsString());
|
||||
user.runAs(new PrivilegedExceptionAction() {
|
||||
public Object run() throws Exception {
|
||||
runWALSplit(newConf);
|
||||
HLog wal2 = createWAL(newConf);
|
||||
|
||||
HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
|
||||
hbaseRootDir, hri, htd, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid2 > -1);
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
|
||||
|
||||
// I can't close wal1. Its been appropriated when we split.
|
||||
region2.close();
|
||||
wal2.closeAndDelete();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test writing edits into an HRegion, closing it, splitting logs, opening
|
||||
* Region again. Verify seqids.
|
||||
|
@ -736,14 +808,14 @@ public class TestWALReplay {
|
|||
try {
|
||||
final HRegion region =
|
||||
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
|
||||
protected boolean internalFlushcache(
|
||||
protected FlushResult internalFlushcache(
|
||||
final HLog wal, final long myseqid, MonitoredTask status)
|
||||
throws IOException {
|
||||
LOG.info("InternalFlushCache Invoked");
|
||||
boolean b = super.internalFlushcache(wal, myseqid,
|
||||
FlushResult fs = super.internalFlushcache(wal, myseqid,
|
||||
Mockito.mock(MonitoredTask.class));
|
||||
flushcount.incrementAndGet();
|
||||
return b;
|
||||
return fs;
|
||||
};
|
||||
};
|
||||
long seqid = region.initialize();
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Utility class for HFile-related testing.
|
||||
*/
|
||||
public class HFileTestUtil {
|
||||
|
||||
/**
|
||||
* Create an HFile with the given number of rows between a given
|
||||
* start key and end key.
|
||||
*/
|
||||
public static void createHFile(
|
||||
Configuration configuration,
|
||||
FileSystem fs, Path path,
|
||||
byte[] family, byte[] qualifier,
|
||||
byte[] startKey, byte[] endKey, int numRows) throws IOException
|
||||
{
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
|
||||
.withPath(fs, path)
|
||||
.withFileContext(meta)
|
||||
.create();
|
||||
long now = System.currentTimeMillis();
|
||||
try {
|
||||
// subtract 2 since iterateOnSplits doesn't include boundary keys
|
||||
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
|
||||
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
|
||||
writer.append(kv);
|
||||
}
|
||||
} finally {
|
||||
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
|
||||
Bytes.toBytes(System.currentTimeMillis()));
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue