HBASE-17662 Disable in-memory flush when replaying from WAL

Signed-off-by: anoopsamjohn <anoopsamjohn@gmail.com>
This commit is contained in:
anastas 2017-03-01 10:01:30 +02:00 committed by anoopsamjohn
parent 4a5eba5e59
commit 613bcb3622
8 changed files with 88 additions and 20 deletions

View File

@ -54,8 +54,9 @@ public abstract class AbstractMemStore implements MemStore {
// Used to track when to flush // Used to track when to flush
private volatile long timeOfOldestEdit; private volatile long timeOfOldestEdit;
public final static long FIXED_OVERHEAD = ClassSize public final static long FIXED_OVERHEAD = ClassSize.OBJECT
.align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + (4 * ClassSize.REFERENCE)
+ (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;

View File

@ -71,16 +71,22 @@ public class CompactingMemStore extends AbstractMemStore {
private long inmemoryFlushSize; // the threshold on active size for in-memory flush private long inmemoryFlushSize; // the threshold on active size for in-memory flush
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
// inWalReplay is true while we are synchronously replaying the edits from WAL
private boolean inWalReplay = false;
@VisibleForTesting @VisibleForTesting
private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true; private boolean compositeSnapshot = true;
public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+ Bytes.SIZEOF_LONG // inmemoryFlushSize + Bytes.SIZEOF_LONG // inmemoryFlushSize
+ 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
+ CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD; + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
public CompactingMemStore(Configuration conf, CellComparator c, public CompactingMemStore(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices, HStore store, RegionServicesForStores regionServices,
@ -232,6 +238,24 @@ public class CompactingMemStore extends AbstractMemStore {
} }
} }
/**
* This message intends to inform the MemStore that next coming updates
* are going to be part of the replaying edits from WAL
*/
@Override
public void startReplayingFromWAL() {
inWalReplay = true;
}
/**
* This message intends to inform the MemStore that the replaying edits from WAL
* are done
*/
@Override
public void stopReplayingFromWAL() {
inWalReplay = false;
}
// the getSegments() method is used for tests only // the getSegments() method is used for tests only
@VisibleForTesting @VisibleForTesting
@Override @Override
@ -388,6 +412,9 @@ public class CompactingMemStore extends AbstractMemStore {
private boolean shouldFlushInMemory() { private boolean shouldFlushInMemory() {
if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
return false; // regardless the size
}
// the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
// the insert of the active into the compaction pipeline // the insert of the active into the compaction pipeline
return (inMemoryFlushInProgress.compareAndSet(false,true)); return (inMemoryFlushInProgress.compareAndSet(false,true));

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/** /**
@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class DefaultMemStore extends AbstractMemStore { public class DefaultMemStore extends AbstractMemStore {
private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD);
public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD);
/** /**
* Default constructor. Used for tests. * Default constructor. Used for tests.
*/ */

View File

@ -878,11 +878,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long maxSeqId = initializeStores(reporter, status); long maxSeqId = initializeStores(reporter, status);
this.mvcc.advanceTo(maxSeqId); this.mvcc.advanceTo(maxSeqId);
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
List<Store> stores = this.getStores(); // update the stores that we are replaying
try {
for (Store store : stores) {
((HStore) store).startReplayingFromWAL();
}
// Recover any edits if available. // Recover any edits if available.
maxSeqId = Math.max(maxSeqId, maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
// Make sure mvcc is up to max. // Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId); this.mvcc.advanceTo(maxSeqId);
} finally {
for (Store store : stores) { // update the stores that we are done replaying
((HStore)store).stopReplayingFromWAL();
}
}
} }
this.lastReplayedOpenRegionSeqId = maxSeqId; this.lastReplayedOpenRegionSeqId = maxSeqId;

View File

@ -662,6 +662,22 @@ public class HStore implements Store {
return storeFile; return storeFile;
} }
/**
* This message intends to inform the MemStore that next coming updates
* are going to be part of the replaying edits from WAL
*/
public void startReplayingFromWAL(){
this.memstore.startReplayingFromWAL();
}
/**
* This message intends to inform the MemStore that the replaying edits from WAL
* are done
*/
public void stopReplayingFromWAL(){
this.memstore.stopReplayingFromWAL();
}
/** /**
* Adds a value to the memstore * Adds a value to the memstore
* @param cell * @param cell

View File

@ -127,4 +127,16 @@ public interface MemStore {
/* Return true if the memstore may use some extra memory space*/ /* Return true if the memstore may use some extra memory space*/
boolean isSloppy(); boolean isSloppy();
/**
* This message intends to inform the MemStore that next coming updates
* are going to be part of the replaying edits from WAL
*/
default void startReplayingFromWAL(){return;}
/**
* This message intends to inform the MemStore that the replaying edits from WAL
* are done
*/
default void stopReplayingFromWAL(){return;}
} }

View File

@ -324,10 +324,10 @@ public class TestHeapSize {
expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false);
expected += ClassSize.estimateBase(CompactionPipeline.class, false); expected += ClassSize.estimateBase(CompactionPipeline.class, false);
expected += ClassSize.estimateBase(LinkedList.class, false); expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline
expected += ClassSize.estimateBase(LinkedList.class, false); expected += ClassSize.estimateBase(LinkedList.class, false); //inside CompactionPipeline
expected += ClassSize.estimateBase(MemStoreCompactor.class, false); expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false);// inside MemStoreCompactor
if (expected != actual) { if (expected != actual) {
ClassSize.estimateBase(cl, true); ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicBoolean.class, true); ClassSize.estimateBase(AtomicBoolean.class, true);

View File

@ -127,8 +127,6 @@ public abstract class AbstractTestWALReplay {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
// The below config supported by 0.20-append and CDH3b2 // The below config supported by 0.20-append and CDH3b2
conf.setInt("dfs.client.block.recovery.retries", 2); conf.setInt("dfs.client.block.recovery.retries", 2);
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.NONE));
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
Path hbaseRootDir = Path hbaseRootDir =
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));