HBASE-16608 Introducing the ability to merge ImmutableSegments without copy-compaction or SQM usage. (Anastasia)

This commit is contained in:
anoopsamjohn 2016-10-24 23:09:48 +05:30
parent 1b12a60392
commit 988d1f9bc9
16 changed files with 910 additions and 331 deletions

View File

@ -195,9 +195,9 @@ public class CompactingMemStore extends AbstractMemStore {
return list; return list;
} }
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
ImmutableSegment result) { boolean merge) {
return pipeline.swap(versionedList, result); return pipeline.swap(versionedList, result, !merge);
} }
/** /**
@ -394,6 +394,11 @@ public class CompactingMemStore extends AbstractMemStore {
allowCompaction.set(true); allowCompaction.set(true);
} }
@VisibleForTesting
void initiateType() {
compactor.initiateAction();
}
/** /**
* @param cell Find the row that comes after this one. If null, we return the * @param cell Find the row that comes after this one. If null, we return the
* first. * first.

View File

@ -90,13 +90,16 @@ public class CompactionPipeline {
* Swapping only if there were no changes to the suffix of the list while it was compacted. * Swapping only if there were no changes to the suffix of the list while it was compacted.
* @param versionedList tail of the pipeline that was compacted * @param versionedList tail of the pipeline that was compacted
* @param segment new compacted segment * @param segment new compacted segment
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
* During index merge op this will be false and for compaction it will be true.
* @return true iff swapped tail with new compacted segment * @return true iff swapped tail with new compacted segment
*/ */
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { public boolean swap(
VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
if (versionedList.getVersion() != version) { if (versionedList.getVersion() != version) {
return false; return false;
} }
LinkedList<ImmutableSegment> suffix; List<ImmutableSegment> suffix;
synchronized (pipeline){ synchronized (pipeline){
if(versionedList.getVersion() != version) { if(versionedList.getVersion() != version) {
return false; return false;
@ -108,13 +111,14 @@ public class CompactionPipeline {
+ versionedList.getStoreSegments().size() + versionedList.getStoreSegments().size()
+ ", and the number of cells in new segment is:" + segment.getCellsCount()); + ", and the number of cells in new segment is:" + segment.getCellsCount());
} }
swapSuffix(suffix,segment); swapSuffix(suffix,segment, closeSuffix);
} }
if (region != null) { if (region != null) {
// update the global memstore size counter // update the global memstore size counter
long suffixSize = getSegmentsKeySize(suffix); long suffixSize = getSegmentsKeySize(suffix);
long newSize = segment.keySize(); long newSize = segment.keySize();
long delta = suffixSize - newSize; long delta = suffixSize - newSize;
assert ( closeSuffix || delta>0 ); // sanity check
long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize
@ -204,11 +208,20 @@ public class CompactionPipeline {
return pipeline.peekLast().keySize(); return pipeline.peekLast().keySize();
} }
private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment) { private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
version++; version++;
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
// compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
// created for the result segment. So we can release the chunks associated with the compacted
// segments.
if (closeSegmentsInSuffix) {
for (Segment itemInSuffix : suffix) { for (Segment itemInSuffix : suffix) {
itemInSuffix.close(); itemInSuffix.close();
} }
}
pipeline.removeAll(suffix); pipeline.removeAll(suffix);
pipeline.addLast(segment); pipeline.addLast(segment);
} }

View File

@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>(); private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
// A queue of chunks from pool contained by this memstore LAB // A queue of chunks from pool contained by this memstore LAB
// TODO: in the future, it would be better to have List implementation instead of Queue,
// as FIFO order is not so important here
@VisibleForTesting @VisibleForTesting
BlockingQueue<PooledChunk> pooledChunkQueue = null; BlockingQueue<PooledChunk> pooledChunkQueue = null;
private final int chunkSize; private final int chunkSize;
@ -101,11 +103,11 @@ public class HeapMemStoreLAB implements MemStoreLAB {
} }
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument( Preconditions.checkArgument(maxAlloc <= chunkSize,
maxAlloc <= chunkSize,
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
} }
@Override @Override
public Cell copyCellInto(Cell cell) { public Cell copyCellInto(Cell cell) {
int size = KeyValueUtil.length(cell); int size = KeyValueUtil.length(cell);
@ -236,8 +238,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {
return this.curChunk.get(); return this.curChunk.get();
} }
@VisibleForTesting
BlockingQueue<PooledChunk> getChunkQueue() { BlockingQueue<PooledChunk> getPooledChunks() {
return this.pooledChunkQueue; return this.pooledChunkQueue;
} }
} }

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A MemStoreLAB implementation which wraps N MemStoreLABs. Its main duty is in proper managing the
* close of the individual MemStoreLAB. This is treated as an immutable one and so do not allow to
* add any more Cells into it. {@link #copyCellInto(Cell)} throws Exception
*/
@InterfaceAudience.Private
public class ImmutableMemStoreLAB implements MemStoreLAB {
private final AtomicInteger openScannerCount = new AtomicInteger();
private volatile boolean closed = false;
private final List<MemStoreLAB> mslabs;
public ImmutableMemStoreLAB(List<MemStoreLAB> mslabs) {
this.mslabs = mslabs;
}
@Override
public Cell copyCellInto(Cell cell) {
throw new IllegalStateException("This is an Immutable MemStoreLAB.");
}
@Override
public void close() {
// 'openScannerCount' here tracks the scanners opened on segments which directly refer to this
// MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of
// the variable in close() and decScannerCount() is as as that in HeapMemstoreLAB. Here the
// close just delegates the call to the individual MSLABs. The actual return of the chunks to
// MSLABPool will happen within individual MSLABs only (which is at the leaf level).
// Say an ImmutableMemStoreLAB is created over 2 HeapMemStoreLABs at some point and at that time
// both of them were referred by ongoing scanners. So they have > 0 'openScannerCount'. Now over
// the new Segment some scanners come in and this MSLABs 'openScannerCount' also goes up and
// then come down on finish of scanners. Now a close() call comes to this Immutable MSLAB. As
// it's 'openScannerCount' is zero it will call close() on both of the Heap MSLABs. Say by that
// time the old scanners on one of the MSLAB got over where as on the other, still an old
// scanner is going on. The call close() on that MSLAB will not close it immediately but will
// just mark it for closure as it's 'openScannerCount' still > 0. Later once the old scan is
// over, the decScannerCount() call will do the actual close and return of the chunks.
this.closed = true;
// When there are still on going scanners over this MSLAB, we will defer the close until all
// scanners finish. We will just mark it for closure. See #decScannerCount(). This will be
// called at end of every scan. When it is marked for closure and scanner count reached 0, we
// will do the actual close then.
checkAndCloseMSLABs(openScannerCount.get());
}
private void checkAndCloseMSLABs(int openScanners) {
if (openScanners == 0) {
for (MemStoreLAB mslab : this.mslabs) {
mslab.close();
}
}
}
@Override
public void incScannerCount() {
this.openScannerCount.incrementAndGet();
}
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
if (this.closed) {
checkAndCloseMSLABs(count);
}
}
}

View File

@ -85,13 +85,14 @@ public class ImmutableSegment extends Segment {
* The input parameter "type" exists for future use when more types of flat ImmutableSegments * The input parameter "type" exists for future use when more types of flat ImmutableSegments
* are going to be introduced. * are going to be introduced.
*/ */
protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, Type type) { MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) {
super(null, // initiailize the CellSet with NULL super(null, // initiailize the CellSet with NULL
comparator, memStoreLAB); comparator, memStoreLAB);
this.type = type; this.type = type;
// build the true CellSet based on CellArrayMap // build the true CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator); CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
this.setCellSet(null, cs); // update the CellSet of the new Segment this.setCellSet(null, cs); // update the CellSet of the new Segment
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
@ -102,7 +103,7 @@ public class ImmutableSegment extends Segment {
* list of older ImmutableSegments. * list of older ImmutableSegments.
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB) { MemStoreLAB memStoreLAB) {
super(new CellSet(comparator), // initiailize the CellSet with empty CellSet super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
comparator, memStoreLAB); comparator, memStoreLAB);
@ -155,7 +156,7 @@ public class ImmutableSegment extends Segment {
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
* Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
* based on CellArrayMap. * based on CellArrayMap.
* If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOP * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP
* *
* Synchronization of the CellSet replacement: * Synchronization of the CellSet replacement:
* The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment * The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment
@ -188,19 +189,26 @@ public class ImmutableSegment extends Segment {
///////////////////// PRIVATE METHODS ///////////////////// ///////////////////// PRIVATE METHODS /////////////////////
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from compacting iterator // Create CellSet based on CellArrayMap from compacting iterator
private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) { private CellSet createCellArrayMapSet(int numOfCells, MemStoreSegmentsIterator iterator,
boolean merge) {
Cell[] cells = new Cell[numOfCells]; // build the Cell Array Cell[] cells = new Cell[numOfCells]; // build the Cell Array
int i = 0; int i = 0;
while (iterator.hasNext()) { while (iterator.hasNext()) {
Cell c = iterator.next(); Cell c = iterator.next();
// The scanner behind the iterator is doing all the elimination logic // The scanner behind the iterator is doing all the elimination logic
if (merge) {
// if this is merge we just move the Cell object without copying MSLAB
// the sizes still need to be updated in the new segment
cells[i] = c;
} else {
// now we just copy it to the new segment (also MSLAB copy) // now we just copy it to the new segment (also MSLAB copy)
cells[i] = maybeCloneWithAllocator(c); cells[i] = maybeCloneWithAllocator(c);
boolean usedMSLAB = (cells[i] != c); }
boolean useMSLAB = (getMemStoreLAB()!=null);
// second parameter true, because in compaction addition of the cell to new segment // second parameter true, because in compaction addition of the cell to new segment
// is always successful // is always successful
updateMetaInfo(c, true, usedMSLAB); // updates the size per cell updateMetaInfo(c, true, useMSLAB); // updates the size per cell
i++; i++;
} }
// build the immutable CellSet // build the immutable CellSet

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -43,37 +44,30 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class MemStoreCompactor { public class MemStoreCompactor {
public static final long DEEP_OVERHEAD = ClassSize public static final long DEEP_OVERHEAD = ClassSize
.align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_DOUBLE .align(ClassSize.OBJECT
+ ClassSize.ATOMIC_BOOLEAN); + 4 * ClassSize.REFERENCE
// compactingMemStore, versionedList, action, isInterrupted (the reference)
// "action" is an enum and thus it is a class with static final constants,
// so counting only the size of the reference to it and not the size of the internals
+ Bytes.SIZEOF_INT // compactionKVMax
+ ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
);
// Option for external guidance whether flattening is allowed // Configuration options for MemStore compaction
static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; static final String INDEX_COMPACTION_CONFIG = "index-compaction";
static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true; static final String DATA_COMPACTION_CONFIG = "data-compaction";
// Option for external setting of the compacted structure (SkipList, CellArray, etc.) // The external setting of the compacting MemStore behaviour
// Compaction of the index without the data is the default
static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type";
static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_TO_ARRAY_MAP as default static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = INDEX_COMPACTION_CONFIG;
// What percentage of the duplications is causing compaction? // The upper bound for the number of segments we store in the pipeline prior to merging.
static final String COMPACTION_THRESHOLD_REMAIN_FRACTION // This constant is subject to further experimentation.
= "hbase.hregion.compacting.memstore.comactPercent"; private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2;
// Option for external guidance whether the flattening is allowed
static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN
= "hbase.hregion.compacting.memstore.avoidSpeculativeScan";
static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false;
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
/**
* Types of Compaction
*/
private enum Type {
COMPACT_TO_SKIPLIST_MAP,
COMPACT_TO_ARRAY_MAP
}
private CompactingMemStore compactingMemStore; private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline // a static version of the segment list from the pipeline
@ -82,22 +76,28 @@ public class MemStoreCompactor {
// a flag raised when compaction is requested to stop // a flag raised when compaction is requested to stop
private final AtomicBoolean isInterrupted = new AtomicBoolean(false); private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
// the limit to the size of the groups to be later provided to MemStoreCompactorIterator // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
private final int compactionKVMax; private final int compactionKVMax;
double fraction = 0.8; /**
* Types of actions to be done on the pipeline upon MemStoreCompaction invocation.
* Note that every value covers the previous ones, i.e. if MERGE is the action it implies
* that the youngest segment is going to be flatten anyway.
*/
private enum Action {
NOOP,
FLATTEN, // flatten the youngest segment in the pipeline
MERGE, // merge all the segments in the pipeline into one
COMPACT // copy-compact the data of all the segments in the pipeline
}
int immutCellsNum = 0; // number of immutable for compaction cells private Action action = Action.FLATTEN;
private Type type = Type.COMPACT_TO_ARRAY_MAP;
public MemStoreCompactor(CompactingMemStore compactingMemStore) { public MemStoreCompactor(CompactingMemStore compactingMemStore) {
this.compactingMemStore = compactingMemStore; this.compactingMemStore = compactingMemStore;
this.compactionKVMax = compactingMemStore.getConfiguration().getInt( this.compactionKVMax = compactingMemStore.getConfiguration()
HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
this.fraction = 1 - compactingMemStore.getConfiguration().getDouble( initiateAction();
COMPACTION_THRESHOLD_REMAIN_FRACTION,
COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT);
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
@ -106,26 +106,16 @@ public class MemStoreCompactor {
* is already an ongoing compaction or no segments to compact. * is already an ongoing compaction or no segments to compact.
*/ */
public boolean start() throws IOException { public boolean start() throws IOException {
if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline
return false;
int t = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY,
COMPACTING_MEMSTORE_TYPE_DEFAULT);
switch (t) {
case 1: type = Type.COMPACT_TO_SKIPLIST_MAP;
break;
case 2: type = Type.COMPACT_TO_ARRAY_MAP;
break;
default: throw new RuntimeException("Unknown type " + type); // sanity check
} }
// get a snapshot of the list of the segments from the pipeline, // get a snapshot of the list of the segments from the pipeline,
// this local copy of the list is marked with specific version // this local copy of the list is marked with specific version
versionedList = compactingMemStore.getImmutableSegments(); versionedList = compactingMemStore.getImmutableSegments();
immutCellsNum = versionedList.getNumOfCells();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " for store " LOG.debug("Starting the In-Memory Compaction for store "
+ compactingMemStore.getStore().getColumnFamilyName()); + compactingMemStore.getStore().getColumnFamilyName());
} }
@ -143,7 +133,14 @@ public class MemStoreCompactor {
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
* Close the scanners and clear the pointers in order to allow good * The interface to check whether user requested the index-compaction
*/
public boolean isIndexCompaction() {
return (action == Action.MERGE);
}
/**----------------------------------------------------------------------
* Reset the interruption indicator and clear the pointers in order to allow good
* garbage collection * garbage collection
*/ */
private void releaseResources() { private void releaseResources() {
@ -152,45 +149,35 @@ public class MemStoreCompactor {
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
* Check whether there are some signs to definitely not to flatten, * Decide what to do with the new and old segments in the compaction pipeline.
* returns false if we must compact. If this method returns true we * Implements basic in-memory compaction policy.
* still need to evaluate the compaction.
*/ */
private boolean shouldFlatten() { private Action policy() {
boolean userToFlatten = // the user configurable option to flatten or not to flatten
compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); return Action.NOOP; // the compaction also doesn't start when interrupted
if (userToFlatten==false) {
LOG.debug("In-Memory shrink is doing compaction, as user asked to avoid flattening");
return false; // the user doesn't want to flatten
} }
if (action == Action.COMPACT) { // compact according to the user request
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be compacted, number of"
+ " cells before compaction is " + versionedList.getNumOfCells());
return Action.COMPACT;
}
// compaction shouldn't happen or doesn't worth it
// limit the number of the segments in the pipeline // limit the number of the segments in the pipeline
int numOfSegments = versionedList.getNumOfSegments(); int numOfSegments = versionedList.getNumOfSegments();
if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) {
LOG.debug("In-Memory shrink is doing compaction, as there already are " + numOfSegments LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " segments in the compaction pipeline"); + " is going to be merged, as there are " + numOfSegments + " segments");
return false; // to avoid "too many open files later", compact now return Action.MERGE; // to avoid too many segments, merge now
} }
// till here we hvae all the signs that it is possible to flatten, run the speculative scan
// (if allowed by the user) to check the efficiency of compaction // if nothing of the above, then just flatten the newly joined segment
boolean avoidSpeculativeScan = // the user configurable option to avoid the speculative scan LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, + compactingMemStore.getFamilyName() + " is going to be flattened");
MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); return Action.FLATTEN;
if (avoidSpeculativeScan==true) {
LOG.debug("In-Memory shrink is doing flattening, as user asked to avoid compaction "
+ "evaluation");
return true; // flatten without checking the compaction expedience
}
try {
immutCellsNum = countCellsForCompaction();
if (immutCellsNum > fraction * versionedList.getNumOfCells()) {
return true;
}
} catch(Exception e) {
return true;
}
return false;
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
@ -201,95 +188,106 @@ public class MemStoreCompactor {
private void doCompaction() { private void doCompaction() {
ImmutableSegment result = null; ImmutableSegment result = null;
boolean resultSwapped = false; boolean resultSwapped = false;
Action nextStep = null;
try { try {
// PHASE I: estimate the compaction expedience - EVALUATE COMPACTION nextStep = policy();
if (shouldFlatten()) {
// too much cells "survive" the possible compaction, we do not want to compact! if (nextStep == Action.NOOP) {
LOG.debug("In-Memory compaction does not pay off - storing the flattened segment" return;
+ " for store: " + compactingMemStore.getFamilyName()); }
// Looking for Segment in the pipeline with SkipList index, to make it flat if (nextStep == Action.FLATTEN) {
// Youngest Segment in the pipeline is with SkipList index, make it flat
compactingMemStore.flattenOneSegment(versionedList.getVersion()); compactingMemStore.flattenOneSegment(versionedList.getVersion());
return; return;
} }
// PHASE II: create the new compacted ImmutableSegment - START COPY-COMPACTION // Create one segment representing all segments in the compaction pipeline,
// either by compaction or by merge
if (!isInterrupted.get()) { if (!isInterrupted.get()) {
result = compact(immutCellsNum); result = createSubstitution();
} }
// Phase III: swap the old compaction pipeline - END COPY-COMPACTION // Substitute the pipeline with one segment
if (!isInterrupted.get()) { if (!isInterrupted.get()) {
if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { if (resultSwapped = compactingMemStore.swapCompactedSegments(
versionedList, result, (action==Action.MERGE))) {
// update the wal so it can be truncated and not get too long // update the wal so it can be truncated and not get too long
compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
} }
} }
} catch (Exception e) { } catch (IOException e) {
LOG.debug("Interrupting the MemStore in-memory compaction for store " LOG.debug("Interrupting the MemStore in-memory compaction for store "
+ compactingMemStore.getFamilyName()); + compactingMemStore.getFamilyName());
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} finally { } finally {
if ((result != null) && (!resultSwapped)) result.close(); // For the MERGE case, if the result was created, but swap didn't happen,
// we DON'T need to close the result segment (meaning its MSLAB)!
// Because closing the result segment means closing the chunks of all segments
// in the compaction pipeline, which still have ongoing scans.
if (nextStep != Action.MERGE) {
if ((result != null) && (!resultSwapped)) {
result.close();
}
}
releaseResources(); releaseResources();
} }
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
* The copy-compaction is the creation of the ImmutableSegment (from the relevant type) * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
* based on the Compactor Iterator. The new ImmutableSegment is returned. * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
*/ */
private ImmutableSegment compact(int numOfCells) throws IOException { private ImmutableSegment createSubstitution() throws IOException {
LOG.debug("In-Memory compaction does pay off - The estimated number of cells "
+ "after compaction is " + numOfCells + ", while number of cells before is " + versionedList
.getNumOfCells() + ". The fraction of remaining cells should be: " + fraction);
ImmutableSegment result = null; ImmutableSegment result = null;
MemStoreCompactorIterator iterator = MemStoreSegmentsIterator iterator = null;
new MemStoreCompactorIterator(versionedList.getStoreSegments(),
switch (action) {
case COMPACT:
iterator =
new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
compactingMemStore.getComparator(), compactingMemStore.getComparator(),
compactionKVMax, compactingMemStore.getStore()); compactionKVMax, compactingMemStore.getStore());
try {
switch (type) { result = SegmentFactory.instance().createImmutableSegmentByCompaction(
case COMPACT_TO_SKIPLIST_MAP:
result = SegmentFactory.instance().createImmutableSegment(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator);
break;
case COMPACT_TO_ARRAY_MAP:
result = SegmentFactory.instance().createImmutableSegment(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED);
break;
default: throw new RuntimeException("Unknown type " + type); // sanity check
}
} finally {
iterator.close(); iterator.close();
break;
case MERGE:
iterator =
new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
compactingMemStore.getComparator(),
compactionKVMax, compactingMemStore.getStore());
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED,
versionedList.getStoreSegments());
iterator.close();
break;
default: throw new RuntimeException("Unknown action " + action); // sanity check
} }
return result; return result;
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
* Count cells to estimate the efficiency of the future compaction * Initiate the action according to user config, after its default is Action.MERGE
*/ */
private int countCellsForCompaction() throws IOException { @VisibleForTesting
void initiateAction() {
String memStoreType = compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY,
COMPACTING_MEMSTORE_TYPE_DEFAULT);
int cnt = 0; switch (memStoreType) {
MemStoreCompactorIterator iterator = case INDEX_COMPACTION_CONFIG: action = Action.MERGE;
new MemStoreCompactorIterator( break;
versionedList.getStoreSegments(), compactingMemStore.getComparator(), case DATA_COMPACTION_CONFIG: action = Action.COMPACT;
compactionKVMax, compactingMemStore.getStore()); break;
default:
try { throw new RuntimeException("Unknown memstore type " + memStoreType); // sanity check
while (iterator.next() != null) { }
cnt++;
}
} finally {
iterator.close();
}
return cnt;
} }
} }

View File

@ -26,47 +26,33 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/** /**
* The MemStoreCompactorIterator is designed to perform one iteration over given list of segments * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
* For another iteration new instance of MemStoreCompactorIterator needs to be created * and performs the scan for compaction operation meaning it is based on SQM
* The iterator is not thread-safe and must have only one instance in each period of time
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class MemStoreCompactorIterator implements Iterator<Cell> { public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
private List<Cell> kvs = new ArrayList<Cell>(); private List<Cell> kvs = new ArrayList<Cell>();
private boolean hasMore;
// scanner for full or partial pipeline (heap of segment scanners) private Iterator<Cell> kvsIterator;
// we need to keep those scanners in order to close them at the end
private KeyValueScanner scanner;
// scanner on top of pipeline scanner that uses ScanQueryMatcher // scanner on top of pipeline scanner that uses ScanQueryMatcher
private StoreScanner compactingScanner; private StoreScanner compactingScanner;
private final ScannerContext scannerContext;
private boolean hasMore;
private Iterator<Cell> kvsIterator;
// C-tor // C-tor
public MemStoreCompactorIterator(List<ImmutableSegment> segments, public MemStoreCompactorSegmentsIterator(
CellComparator comparator, int compactionKVMax, Store store) throws IOException { List<ImmutableSegment> segments,
CellComparator comparator, int compactionKVMax, Store store
this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); ) throws IOException {
super(segments,comparator,compactionKVMax,store);
// list of Scanners of segments in the pipeline, when compaction starts
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
// create the list of scanners with maximally possible read point, meaning that
// all KVs are going to be returned by the pipeline traversing
for (Segment segment : segments) {
scanners.add(segment.getScanner(store.getSmallestReadPoint()));
}
scanner = new MemStoreScanner(comparator, scanners, true);
// build the scanner based on Query Matcher
// reinitialize the compacting scanner for each instance of iterator // reinitialize the compacting scanner for each instance of iterator
compactingScanner = createScanner(store, scanner); compactingScanner = createScanner(store, scanner);
@ -80,17 +66,23 @@ public class MemStoreCompactorIterator implements Iterator<Cell> {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (kvsIterator == null) { // for the case when the result is empty
return false;
}
if (!kvsIterator.hasNext()) { if (!kvsIterator.hasNext()) {
// refillKVS() method should be invoked only if !kvsIterator.hasNext() // refillKVS() method should be invoked only if !kvsIterator.hasNext()
if (!refillKVS()) { if (!refillKVS()) {
return false; return false;
} }
} }
return (kvsIterator.hasNext() || hasMore); return kvsIterator.hasNext();
} }
@Override @Override
public Cell next() { public Cell next() {
if (kvsIterator == null) { // for the case when the result is empty
return null;
}
if (!kvsIterator.hasNext()) { if (!kvsIterator.hasNext()) {
// refillKVS() method should be invoked only if !kvsIterator.hasNext() // refillKVS() method should be invoked only if !kvsIterator.hasNext()
if (!refillKVS()) return null; if (!refillKVS()) return null;
@ -128,7 +120,8 @@ public class MemStoreCompactorIterator implements Iterator<Cell> {
} }
/* Refill kev-value set (should be invoked only when KVS is empty)
* Returns true if KVS is non-empty */
private boolean refillKVS() { private boolean refillKVS() {
kvs.clear(); // clear previous KVS, first initiated in the constructor kvs.clear(); // clear previous KVS, first initiated in the constructor
if (!hasMore) { // if there is nothing expected next in compactingScanner if (!hasMore) { // if there is nothing expected next in compactingScanner
@ -153,7 +146,4 @@ public class MemStoreCompactorIterator implements Iterator<Cell> {
} }
return hasMore; return hasMore;
} }
} }

View File

@ -0,0 +1,68 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
import java.util.List;
/**
* The MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator
* and performs the scan for simple merge operation meaning it is NOT based on SQM
*/
@InterfaceAudience.Private
public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator {
// C-tor
public MemStoreMergerSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator,
int compactionKVMax, Store store
) throws IOException {
super(segments,comparator,compactionKVMax,store);
}
@Override
public boolean hasNext() {
return (scanner.peek()!=null);
}
@Override
public Cell next() {
Cell result = null;
try { // try to get next
result = scanner.next();
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
return result;
}
public void close() {
scanner.close();
scanner = null;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,64 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
import java.util.*;
/**
* The MemStoreSegmentsIterator is designed to perform one iteration over given list of segments
* For another iteration new instance of MemStoreSegmentsIterator needs to be created
* The iterator is not thread-safe and must have only one instance per MemStore
* in each period of time
*/
@InterfaceAudience.Private
public abstract class MemStoreSegmentsIterator implements Iterator<Cell> {
// scanner for full or partial pipeline (heap of segment scanners)
// we need to keep those scanners in order to close them at the end
protected KeyValueScanner scanner;
protected final ScannerContext scannerContext;
// C-tor
public MemStoreSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator,
int compactionKVMax, Store store) throws IOException {
this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
// list of Scanners of segments in the pipeline, when compaction starts
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
// create the list of scanners with the smallest read point, meaning that
// only relevant KVs are going to be returned by the pipeline traversing
for (Segment segment : segments) {
scanners.add(segment.getScanner(store.getSmallestReadPoint()));
}
scanner = new MemStoreScanner(comparator, scanners, true);
}
public abstract void close();
}

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/** /**
* A singleton store segment factory. * A singleton store segment factory.
@ -46,18 +48,22 @@ public final class SegmentFactory {
// create skip-list-based (non-flat) immutable segment from compacting old immutable segments // create skip-list-based (non-flat) immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegment(final Configuration conf, public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreCompactorIterator iterator) { final CellComparator comparator, MemStoreSegmentsIterator iterator) {
return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf)); return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf));
} }
// create new flat immutable segment from compacting old immutable segment // create new flat immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegment(final Configuration conf, public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
ImmutableSegment.Type segmentType) throws IOException { ImmutableSegment.Type segmentType)
Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, throws IOException {
Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
"wrong immutable segment type"); "wrong immutable segment type");
return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf), numOfCells, MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
segmentType); return
// the last parameter "false" means not to merge, but to compact the pipeline
// in order to create the new segment
new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false);
} }
// create empty immutable segment // create empty immutable segment
@ -77,6 +83,19 @@ public final class SegmentFactory {
return generateMutableSegment(conf, comparator, memStoreLAB); return generateMutableSegment(conf, comparator, memStoreLAB);
} }
// create new flat immutable segment from merging old immutable segments
public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
ImmutableSegment.Type segmentType, List<ImmutableSegment> segments)
throws IOException {
Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
"wrong immutable segment type");
MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
return
// the last parameter "true" means to merge the compaction pipeline
// in order to create the new segment
new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true);
}
//****** private methods to instantiate concrete store segments **********// //****** private methods to instantiate concrete store segments **********//
private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator, private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
@ -96,4 +115,11 @@ public final class SegmentFactory {
return memStoreLAB; return memStoreLAB;
} }
private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
for (ImmutableSegment segment : segments) {
mslabs.add(segment.getMemStoreLAB());
}
return new ImmutableMemStoreLAB(mslabs);
}
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -43,7 +44,7 @@ public class VersionedSegmentsList {
this.version = version; this.version = version;
} }
public LinkedList<ImmutableSegment> getStoreSegments() { public List<ImmutableSegment> getStoreSegments() {
return storeSegments; return storeSegments;
} }

View File

@ -508,6 +508,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Test @Test
public void testPuttingBackChunksWithOpeningPipelineScanner() public void testPuttingBackChunksWithOpeningPipelineScanner()
throws IOException { throws IOException {
// set memstore to do data compaction and not to use the speculative scan
memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction");
((CompactingMemStore)memstore).initiateType();
byte[] row = Bytes.toBytes("testrow"); byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1"); byte[] qf1 = Bytes.toBytes("testqualifier1");
@ -585,6 +590,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Test @Test
public void testCompaction1Bucket() throws IOException { public void testCompaction1Bucket() throws IOException {
// set memstore to do data compaction and not to use the speculative scan
memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction");
((CompactingMemStore)memstore).initiateType();
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
// test 1 bucket // test 1 bucket
@ -609,6 +618,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Test @Test
public void testCompaction2Buckets() throws IOException { public void testCompaction2Buckets() throws IOException {
// set memstore to do data compaction and not to use the speculative scan
memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction");
((CompactingMemStore)memstore).initiateType();
String[] keys1 = { "A", "A", "B", "C" }; String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" }; String[] keys2 = { "A", "B", "D" };
@ -647,6 +659,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Test @Test
public void testCompaction3Buckets() throws IOException { public void testCompaction3Buckets() throws IOException {
// set memstore to do data compaction and not to use the speculative scan
memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction");
((CompactingMemStore)memstore).initiateType();
String[] keys1 = { "A", "A", "B", "C" }; String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" }; String[] keys2 = { "A", "B", "D" };
String[] keys3 = { "D", "B", "B" }; String[] keys3 = { "D", "B", "B" };

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -61,7 +62,8 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
compactingSetUp(); compactingSetUp();
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap // set memstore to do data compaction and not to use the speculative scan
conf.set("hbase.hregion.compacting.memstore.type", "data-compaction");
this.memstore = this.memstore =
new CompactingMemStore(conf, CellComparator.COMPARATOR, store, new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
@ -215,18 +217,17 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Flattening tests // Merging tests
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@Test @Test
public void testFlattening() throws IOException { public void testMerging() throws IOException {
String[] keys1 = { "A", "A", "B", "C", "F", "H"}; String[] keys1 = { "A", "A", "B", "C", "F", "H"};
String[] keys2 = { "A", "B", "D", "G", "I", "J"}; String[] keys2 = { "A", "B", "D", "G", "I", "J"};
String[] keys3 = { "D", "B", "B", "E" }; String[] keys3 = { "D", "B", "B", "E" };
// set flattening to true memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "index-compaction");
memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); ((CompactingMemStore)memstore).initiateType();
addRowsByKeys(memstore, keys1); addRowsByKeys(memstore, keys1);
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
@ -238,13 +239,31 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
addRowsByKeys(memstore, keys2); // also should only flatten addRowsByKeys(memstore, keys2); // also should only flatten
int counter2 = 0;
for ( Segment s : memstore.getSegments()) {
counter2 += s.getCellsCount();
}
assertEquals(12, counter2);
((CompactingMemStore) memstore).disableCompaction(); ((CompactingMemStore) memstore).disableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
int counter3 = 0;
for ( Segment s : memstore.getSegments()) {
counter3 += s.getCellsCount();
}
assertEquals(12, counter3);
addRowsByKeys(memstore, keys3); addRowsByKeys(memstore, keys3);
int counter4 = 0;
for ( Segment s : memstore.getSegments()) {
counter4 += s.getCellsCount();
}
assertEquals(16, counter4);
((CompactingMemStore) memstore).enableCompaction(); ((CompactingMemStore) memstore).enableCompaction();
@ -258,7 +277,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
for ( Segment s : memstore.getSegments()) { for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount(); counter += s.getCellsCount();
} }
assertEquals(10,counter); assertEquals(16,counter);
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
@ -295,7 +314,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
Threads.sleep(10); Threads.sleep(10);
} }
// Just doing the cnt operation here // Just doing the cnt operation here
MemStoreCompactorIterator itr = new MemStoreCompactorIterator( MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore());
int cnt = 0; int cnt = 0;

View File

@ -172,7 +172,7 @@ public class TestMemStoreLAB {
public void testLABChunkQueue() throws Exception { public void testLABChunkQueue() throws Exception {
HeapMemStoreLAB mslab = new HeapMemStoreLAB(); HeapMemStoreLAB mslab = new HeapMemStoreLAB();
// by default setting, there should be no chunk queue initialized // by default setting, there should be no chunk queue initialized
assertNull(mslab.getChunkQueue()); assertNull(mslab.getPooledChunks());
// reset mslab with chunk pool // reset mslab with chunk pool
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1); conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1);
@ -209,7 +209,7 @@ public class TestMemStoreLAB {
// close the mslab // close the mslab
mslab.close(); mslab.close();
// make sure all chunks reclaimed or removed from chunk queue // make sure all chunks reclaimed or removed from chunk queue
int queueLength = mslab.getChunkQueue().size(); int queueLength = mslab.getPooledChunks().size();
assertTrue("All chunks in chunk queue should be reclaimed or removed" assertTrue("All chunks in chunk queue should be reclaimed or removed"
+ " after mslab closed but actually: " + queueLength, queueLength == 0); + " after mslab closed but actually: " + queueLength, queueLength == 0);
} }

View File

@ -130,7 +130,7 @@ public class TestPerColumnFamilyFlush {
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
100 * 1024); 100 * 1024);
// Intialize the region // Intialize the region
Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) { for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i)); region.put(createPut(1, i));

View File

@ -36,11 +36,13 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -121,7 +123,7 @@ public class TestWalAndCompactingMemStoreFlush {
} }
@Test(timeout = 180000) @Test(timeout = 180000)
public void testSelectiveFlushWhenEnabled() throws IOException { public void testSelectiveFlushWithDataCompaction() throws IOException {
// Set up the configuration // Set up the configuration
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
@ -130,9 +132,11 @@ public class TestWalAndCompactingMemStoreFlush {
FlushNonSloppyStoresFirstPolicy.class.getName()); FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
// set memstore to do data compaction
conf.set("hbase.hregion.compacting.memstore.type", "data-compaction");
// Intialize the region // Intialize the region
Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) { for (int i = 1; i <= 1200; i++) {
@ -330,7 +334,7 @@ public class TestWalAndCompactingMemStoreFlush {
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertTrue( assertTrue(
CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD < cf1MemstoreSizePhaseV); CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD <= cf1MemstoreSizePhaseV);
assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseV); cf2MemstoreSizePhaseV);
assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
@ -371,9 +375,13 @@ public class TestWalAndCompactingMemStoreFlush {
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
/*------------------------------------------------------------------------------*/
/* Check the same as above but for index-compaction type of compacting memstore */
@Test(timeout = 180000) @Test(timeout = 180000)
public void testSelectiveFlushWhenEnabledNoFlattening() throws IOException { public void testSelectiveFlushWithIndexCompaction() throws IOException {
/*------------------------------------------------------------------------------*/
/* SETUP */
// Set up the configuration // Set up the configuration
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
@ -381,18 +389,17 @@ public class TestWalAndCompactingMemStoreFlush {
FlushNonSloppyStoresFirstPolicy.class.getName()); FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to index-compaction
conf.set("hbase.hregion.compacting.memstore.type", "index-compaction");
// set memstore segment flattening to false and compact to skip-list // Initialize the region
conf.setBoolean("hbase.hregion.compacting.memstore.flatten", false); Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
conf.setInt("hbase.hregion.compacting.memstore.type",1);
// Intialize the region
Region region = initHRegion("testSelectiveFlushWhenEnabled", conf);
/*------------------------------------------------------------------------------*/
/* PHASE I - insertions */
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) { for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i)); // compacted memstore region.put(createPut(1, i)); // compacted memstore
if (i <= 100) { if (i <= 100) {
region.put(createPut(2, i)); region.put(createPut(2, i));
if (i <= 50) { if (i <= 50) {
@ -400,41 +407,32 @@ public class TestWalAndCompactingMemStoreFlush {
} }
} }
} }
// Now add more puts for CF2, so that we only flush CF2 to disk // Now add more puts for CF2, so that we only flush CF2 to disk
for (int i = 100; i < 2000; i++) { for (int i = 100; i < 2000; i++) {
region.put(createPut(2, i)); region.put(createPut(2, i));
} }
long totalMemstoreSize = region.getMemstoreSize(); /*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/
/* PHASE I - collect sizes */
long totalMemstoreSizePhaseI = region.getMemstoreSize();
// Find the smallest LSNs for edits wrt to each CF. // Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
// Find the sizes of the memstores of each CF. // Find the sizes of the memstores of each CF.
long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
// Get the overall smallest LSN in the region's memstores. // Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
String s = "\n\n----------------------------------\n" /*------------------------------------------------------------------------------*/
+ "Upon initial insert and before any flush, size of CF1 is:" /* PHASE I - validation */
+ cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+ region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:"
+ cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+ region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:"
+ cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
+ region.getStore(FAMILY3).isSloppyMemstore() + "\n";
// The overall smallest LSN in the region's memstores should be the same as // The overall smallest LSN in the region's memstores should be the same as
// the LSN of the smallest edit in CF1 // the LSN of the smallest edit in CF1
assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
// Some other sanity checks. // Some other sanity checks.
assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
@ -444,149 +442,169 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize + assertEquals(
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + totalMemstoreSizePhaseI
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + + 1 * DefaultMemStore.DEEP_OVERHEAD
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + + 2 * CompactingMemStore.DEEP_OVERHEAD
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; + 3 * MutableSegment.DEEP_OVERHEAD,
assertEquals(msg,
totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
+ (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!!!!!!!!!!!!!!!!!!!!!! /*------------------------------------------------------------------------------*/
// We have big compacting memstore CF1 and two small memstores: /* PHASE I - Flush */
// CF2 (not compacted) and CF3 (compacting) // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
// All together they are above the flush size lower bound. // CF1, CF2, CF3, all together they are above the flush size lower bound.
// Since CF1 and CF3 should be flushed to memory (not to disk), // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
// CF2 is going to be flushed to disk. // CF1 and CF3 - flushed to memory and flatten explicitly
// CF1 - nothing to compact, CF3 - should be twice compacted region.flush(false);
CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
cms1.flushInMemory(); cms1.flushInMemory();
cms3.flushInMemory(); cms3.flushInMemory();
region.flush(false);
// CF3/CF1 should be merged so wait here to be sure the compaction is done
while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
while (((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
/*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/
/* PHASE II - collect sizes */
// Recalculate everything // Recalculate everything
long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Find the smallest LSNs for edits wrt to each CF. // Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
long totalMemstoreSizePhaseII = region.getMemstoreSize();
s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD /*------------------------------------------------------------------------------*/
+ ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD /* PHASE II - validation */
+ "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" // CF1 was flushed to memory, should be flattened and take less space
+ "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI);
+ ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
// CF1 was flushed to memory, but there is nothing to compact, should
// remain the same size plus renewed empty skip-list
assertEquals(s, cf1MemstoreSizePhaseII, cf1MemstoreSizePhaseI
+ ImmutableSegment.DEEP_OVERHEAD_CAM + CompactionPipeline.ENTRY_OVERHEAD);
// CF2 should become empty // CF2 should become empty
assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseII); cf2MemstoreSizePhaseII);
// verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
// verify that CF3 was flushed to memory and was compacted (this is approximation check) // if compacted CF# should be at least twice less because its every key was duplicated
assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD
+ ImmutableSegment.DEEP_OVERHEAD_CAM
+ CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII);
assertTrue(cf3MemstoreSizePhaseI / 2 < cf3MemstoreSizePhaseII); assertTrue(cf3MemstoreSizePhaseI / 2 < cf3MemstoreSizePhaseII);
// Now the smallest LSN in the region should be the same as the smallest // Now the smallest LSN in the region should be the same as the smallest
// LSN in the memstore of CF1. // LSN in the memstore of CF1.
assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
// items in CF1/2
assertEquals(
totalMemstoreSizePhaseII
+ 1 * DefaultMemStore.DEEP_OVERHEAD
+ 2 * CompactingMemStore.DEEP_OVERHEAD
+ 3 * MutableSegment.DEEP_OVERHEAD
+ 2 * CompactionPipeline.ENTRY_OVERHEAD
+ 2 * ImmutableSegment.DEEP_OVERHEAD_CAM,
cf1MemstoreSizePhaseII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII);
/*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/
/* PHASE III - insertions */
// Now add more puts for CF1, so that we also flush CF1 to disk instead of // Now add more puts for CF1, so that we also flush CF1 to disk instead of
// memory in next flush // memory in next flush. This is causing the CF! to be flushed to memory twice.
for (int i = 1200; i < 2000; i++) { for (int i = 1200; i < 8000; i++) {
region.put(createPut(1, i)); region.put(createPut(1, i));
} }
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII // CF1 should be flatten and merged so wait here to be sure the compaction is done
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
"the smallest sequence in CF2:" .isMemStoreFlushingInMemory()) {
+ smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n"; Threads.sleep(10);
}
// How much does the CF1 memstore occupy? Will be used later. /*------------------------------------------------------------------------------*/
/* PHASE III - collect sizes */
// How much does the CF1 memstore occupy now? Will be used later.
long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); long totalMemstoreSizePhaseIII = region.getMemstoreSize();
s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII /*------------------------------------------------------------------------------*/
+ ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; /* PHASE III - validation */
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
// items in CF1/2
assertEquals(
totalMemstoreSizePhaseIII
+ 1 * DefaultMemStore.DEEP_OVERHEAD
+ 2 * CompactingMemStore.DEEP_OVERHEAD
+ 3 * MutableSegment.DEEP_OVERHEAD
+ 2 * CompactionPipeline.ENTRY_OVERHEAD
+ 2 * ImmutableSegment.DEEP_OVERHEAD_CAM,
cf1MemstoreSizePhaseIII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII);
/*------------------------------------------------------------------------------*/
// Flush!!!!!!!!!!!!!!!!!!!!!! /* PHASE III - Flush */
// Flush again, CF1 is flushed to disk // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
// CF2 is flushed to disk, because it is not in-memory compacted memstore // CF1 is flushed to disk, but not entirely emptied.
// CF3 is flushed empty to memory (actually nothing happens to CF3) // CF2 was and remained empty, same way nothing happens to CF3
region.flush(false); region.flush(false);
/*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/
/* PHASE IV - collect sizes */
// Recalculate everything // Recalculate everything
long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" /*------------------------------------------------------------------------------*/
+ cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV /* PHASE IV - validation */
+ "\n"; // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
// CF2 should remain empty
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV);
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
"the smallest sequence in CF2:"
+ smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV
+ "\n";
// CF1's pipeline component (inserted before first flush) should be flushed to disk
// CF2 should be flushed to disk
assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD
+ MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV);
assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseIV); cf2MemstoreSizePhaseIV);
// CF3 shouldn't have been touched. // CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
// the smallest LSN of CF3 shouldn't change // the smallest LSN of CF3 shouldn't change
assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
// CF3 should be bottleneck for WAL // CF3 should be bottleneck for WAL
assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
// Flush!!!!!!!!!!!!!!!!!!!!!! /*------------------------------------------------------------------------------*/
// Clearing the existing memstores, CF2 all flushed to disk. The single /* PHASE IV - Flush */
// memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
// Note that active sets of CF1 and CF3 are empty // Force flush to disk on all memstores (flush parameter true).
// CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty
region.flush(true); region.flush(true);
/*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/
/* PHASE V - collect sizes */
// Recalculate everything // Recalculate everything
long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long totalMemstoreSizePhaseV = region.getMemstoreSize();
/*------------------------------------------------------------------------------*/
/* PHASE V - validation */
assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf1MemstoreSizePhaseV); cf1MemstoreSizePhaseV);
assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseV); cf2MemstoreSizePhaseV);
assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf3MemstoreSizePhaseV); cf3MemstoreSizePhaseV);
// The total memstores size should be empty
assertEquals(totalMemstoreSizePhaseV, 0);
// Because there is nothing in any memstore the WAL's LSN should be -1 // Because there is nothing in any memstore the WAL's LSN should be -1
assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM); assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM);
@ -594,6 +612,9 @@ public class TestWalAndCompactingMemStoreFlush {
// any Column Family above the threshold? // any Column Family above the threshold?
// In that case, we should flush all the CFs. // In that case, we should flush all the CFs.
/*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/
/* PHASE VI - insertions */
// The memstore limit is 200*1024 and the column family flush threshold is // The memstore limit is 200*1024 and the column family flush threshold is
// around 50*1024. We try to just hit the memstore limit with each CF's // around 50*1024. We try to just hit the memstore limit with each CF's
// memstore being below the CF flush threshold. // memstore being below the CF flush threshold.
@ -605,24 +626,32 @@ public class TestWalAndCompactingMemStoreFlush {
region.put(createPut(5, i)); region.put(createPut(5, i));
} }
long cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
long cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
long cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
/*------------------------------------------------------------------------------*/
/* PHASE VI - Flush */
// Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
// None among compacting memstores was flushed to memory due to previous puts.
// But is going to be moved to pipeline and flatten due to the flush.
region.flush(false); region.flush(false);
s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
+ smallestSeqInRegionCurrentMemstorePhaseV
+ ". After additional inserts and last flush, the entire region size is:" + region
.getMemstoreSize()
+ "\n----------------------------------\n";
// Since we won't find any CF above the threshold, and hence no specific // Since we won't find any CF above the threshold, and hence no specific
// store to flush, we should flush all the memstores // store to flush, we should flush all the memstores
// Also compacted memstores are flushed to disk. // Also compacted memstores are flushed to disk, but not entirely emptied
assertEquals(0, region.getMemstoreSize()); long cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
System.out.println(s); long cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
long cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
assertTrue(cf1ActiveSizePhaseVII < cf1ActiveSizePhaseVI);
assertTrue(cf3ActiveSizePhaseVII < cf3ActiveSizePhaseVI);
assertTrue(cf5ActiveSizePhaseVII < cf5ActiveSizePhaseVI);
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
@Test(timeout = 180000) @Test(timeout = 180000)
public void testSelectiveFlushWhenEnabledAndWALinCompaction() throws IOException { public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
// Set up the configuration // Set up the configuration
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
@ -631,9 +660,11 @@ public class TestWalAndCompactingMemStoreFlush {
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 *
1024); 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do data compaction and not to use the speculative scan
conf.set("hbase.hregion.compacting.memstore.type", "data-compaction");
// Intialize the HRegion // Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) { for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i)); region.put(createPut(1, i));
@ -752,6 +783,253 @@ public class TestWalAndCompactingMemStoreFlush {
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
@Test(timeout = 180000)
public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
200 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do data compaction and not to use the speculative scan
conf.set("hbase.hregion.compacting.memstore.type", "index-compaction");
// Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i));
if (i <= 100) {
region.put(createPut(2, i));
if (i <= 50) {
region.put(createPut(3, i));
}
}
}
// Now add more puts for CF2, so that we only flush CF2 to disk
for (int i = 100; i < 2000; i++) {
region.put(createPut(2, i));
}
long totalMemstoreSize = region.getMemstoreSize();
// Find the sizes of the memstores of each CF.
long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
// Some other sanity checks.
assertTrue(cf1MemstoreSizePhaseI > 0);
assertTrue(cf2MemstoreSizePhaseI > 0);
assertTrue(cf3MemstoreSizePhaseI > 0);
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
assertEquals(
totalMemstoreSize
+ 1 * DefaultMemStore.DEEP_OVERHEAD
+ 2 * CompactingMemStore.DEEP_OVERHEAD
+ 3 * MutableSegment.DEEP_OVERHEAD,
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
// CF1 and CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
region.flush(false);
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseII = region.getWAL()
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
// CF2 should have been cleared
assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseII);
// Add same entries to compact them later
for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i));
if (i <= 100) {
region.put(createPut(2, i));
if (i <= 50) {
region.put(createPut(3, i));
}
}
}
// Now add more puts for CF2, so that we only flush CF2 to disk
for (int i = 100; i < 2000; i++) {
region.put(createPut(2, i));
}
long smallestSeqInRegionCurrentMemstorePhaseIII = region.getWAL()
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
// Flush!
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
// CF1 and CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
region.flush(false);
long smallestSeqInRegionCurrentMemstorePhaseIV = region.getWAL()
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
// now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge
assertFalse(
smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
HBaseTestingUtility.closeRegionAndWAL(region);
}
// should end in 300 seconds (5 minutes)
@Test(timeout = 300000)
public void testStressFlushAndWALinIndexCompaction() throws IOException {
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
200 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do data compaction and not to use the speculative scan
conf.set("hbase.hregion.compacting.memstore.type", "index-compaction");
// Successfully initialize the HRegion
HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
Thread[] threads = new Thread[25];
for (int i = 0; i < threads.length; i++) {
int id = i * 10000;
ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
threads[i] = new Thread(runnable);
threads[i].start();
}
Threads.sleep(10000); // let other threads start
region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
Threads.sleep(10000); // let other threads continue
region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
* memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
* releases updatesLock and compacts the pipeline.
*/
private class ConcurrentPutRunnable implements Runnable {
private final HRegion stressedRegion;
private final int startNumber;
ConcurrentPutRunnable(HRegion r, int i) {
this.stressedRegion = r;
this.startNumber = i;
}
@Override
public void run() {
try {
int dummy = startNumber / 10000;
System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n");
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = startNumber; i <= startNumber + 3000; i++) {
stressedRegion.put(createPut(1, i));
if (i <= startNumber + 2000) {
stressedRegion.put(createPut(2, i));
if (i <= startNumber + 1000) {
stressedRegion.put(createPut(3, i));
}
}
}
System.out.print("Thread with start number " + startNumber + " continues to more puts\n");
// Now add more puts for CF2, so that we only flush CF2 to disk
for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
stressedRegion.put(createPut(2, i));
}
// And add more puts for CF1
for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
stressedRegion.put(createPut(1, i));
}
System.out.print("Thread with start number " + startNumber + " flushes\n");
// flush (IN MEMORY) one of the stores (each thread flushes different store)
// and wait till the flush and the following action are done
if (startNumber == 0) {
((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
.flushInMemory();
while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
}
if (startNumber == 10000) {
((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore).flushInMemory();
while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
}
if (startNumber == 20000) {
((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore).flushInMemory();
while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
.isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
}
System.out.print("Thread with start number " + startNumber + " finishes\n");
} catch (IOException e) {
assert false;
}
}
}
private WAL getWAL(Region region) { private WAL getWAL(Region region) {
return ((HRegion)region).getWAL(); return ((HRegion)region).getWAL();
} }