HBASE-18010: CellChunkMap integration into CompactingMemStore. CellChunkMap usage is currently switched off by default. New tests are included. Review comments addressed.

This commit is contained in:
anastas 2017-07-05 12:35:21 +03:00
parent 8b63eb6fc6
commit 8ac4308411
23 changed files with 459 additions and 819 deletions

View File

@ -89,6 +89,15 @@ public class ClassSize {
/** Overhead for ConcurrentSkipListMap Entry */
public static final int CONCURRENT_SKIPLISTMAP_ENTRY;
/** Overhead for CellFlatMap */
public static final int CELL_FLAT_MAP;
/** Overhead for CellChunkMap */
public static final int CELL_CHUNK_MAP;
/** Overhead for Cell Chunk Map Entry */
public static final int CELL_CHUNK_MAP_ENTRY;
/** Overhead for CellArrayMap */
public static final int CELL_ARRAY_MAP;
@ -275,13 +284,17 @@ public class ClassSize {
// The size changes from jdk7 to jdk8, estimate the size rather than use a conditional
CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false);
// CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
// CellFlatMap class. CellArrayMap object containing a ref to an Array, so
// OBJECT + REFERENCE + ARRAY
// CellFlatMap object contains two integers, one boolean and one reference to object, so
// 2*INT + BOOLEAN + REFERENCE
CELL_ARRAY_MAP = align(OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN
+ ARRAY + 2*REFERENCE);
CELL_FLAT_MAP = OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN + REFERENCE;
// CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
// CellFlatMap class. CellArrayMap object containing a ref to an Array of Cells
CELL_ARRAY_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
// CELL_CHUNK_MAP is the size of an instance of CellChunkMap class, which extends
// CellFlatMap class. CellChunkMap object containing a ref to an Array of Chunks
CELL_CHUNK_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
CONCURRENT_SKIPLISTMAP_ENTRY = align(
align(OBJECT + (3 * REFERENCE)) + /* one node per entry */
@ -290,6 +303,12 @@ public class ClassSize {
// REFERENCE in the CellArrayMap all the rest is counted in KeyValue.heapSize()
CELL_ARRAY_MAP_ENTRY = align(REFERENCE);
// The Cell Representation in the CellChunkMap, the Cell object size shouldn't be counted
// in KeyValue.heapSize()
// each cell-representation requires three integers for chunkID (reference to the ByteBuffer),
// offset and length, and one long for seqID
CELL_CHUNK_MAP_ENTRY = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE));
ATOMIC_LONG = align(OBJECT + Bytes.SIZEOF_LONG);

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ClassSize;
import java.util.Comparator;
@ -56,11 +57,12 @@ import java.util.Comparator;
public class CellChunkMap extends CellFlatMap {
private final Chunk[] chunks; // the array of chunks, on which the index is based
private final int numOfCellsInsideChunk; // constant number of cell-representations in a chunk
// each cell-representation requires three integers for chunkID (reference to the ByteBuffer),
// offset and length, and one long for seqID
public static final int SIZEOF_CELL_REP = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG ;
// constant number of cell-representations in a chunk
// each chunk starts with its own ID following the cells data
public static final int NUM_OF_CELL_REPS_IN_CHUNK =
(ChunkCreator.getInstance().getChunkSize() - ChunkCreator.SIZEOF_CHUNK_HEADER) /
ClassSize.CELL_CHUNK_MAP_ENTRY;
/**
* C-tor for creating CellChunkMap from existing Chunk array, which must be ordered
@ -75,9 +77,6 @@ public class CellChunkMap extends CellFlatMap {
Chunk[] chunks, int min, int max, boolean descending) {
super(comparator, min, max, descending);
this.chunks = chunks;
this.numOfCellsInsideChunk = // each chunk starts with its own ID following the cells data
(ChunkCreator.getInstance().getChunkSize() - Bytes.SIZEOF_INT) / SIZEOF_CELL_REP;
}
/* To be used by base (CellFlatMap) class only to create a sub-CellFlatMap
@ -91,20 +90,20 @@ public class CellChunkMap extends CellFlatMap {
@Override
protected Cell getCell(int i) {
// get the index of the relevant chunk inside chunk array
int chunkIndex = (i / numOfCellsInsideChunk);
int chunkIndex = (i / NUM_OF_CELL_REPS_IN_CHUNK);
ByteBuffer block = chunks[chunkIndex].getData();// get the ByteBuffer of the relevant chunk
int j = i - chunkIndex * numOfCellsInsideChunk; // get the index of the cell-representation
int j = i - chunkIndex * NUM_OF_CELL_REPS_IN_CHUNK; // get the index of the cell-representation
// find inside the offset inside the chunk holding the index, skip bytes for chunk id
int offsetInBytes = Bytes.SIZEOF_INT + j* SIZEOF_CELL_REP;
int offsetInBytes = ChunkCreator.SIZEOF_CHUNK_HEADER + j* ClassSize.CELL_CHUNK_MAP_ENTRY;
// find the chunk holding the data of the cell, the chunkID is stored first
int chunkId = ByteBufferUtils.toInt(block, offsetInBytes);
Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId);
if (chunk == null) {
// this should not happen, putting an assertion here at least for the testing period
assert false;
// this should not happen
throw new IllegalArgumentException("In CellChunkMap, cell must be associated with chunk."
+ ". We were looking for a cell at index " + i);
}
// find the offset of the data of the cell, skip integer for chunkID, offset is stored second
@ -118,8 +117,10 @@ public class CellChunkMap extends CellFlatMap {
ByteBuffer buf = chunk.getData(); // get the ByteBuffer where the cell data is stored
if (buf == null) {
// this should not happen, putting an assertion here at least for the testing period
assert false;
// this should not happen
throw new IllegalArgumentException("In CellChunkMap, chunk must be associated with ByteBuffer."
+ " Chunk: " + chunk + " Chunk ID: " + chunk.getId() + ", is from pool: "
+ chunk.isFromPool() + ". We were looking for a cell at index " + i);
}
return new ByteBufferChunkCell(buf, offsetOfCell, lengthOfCell, cellSeqID);

View File

@ -18,7 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@ -51,9 +52,28 @@ public class ChunkCreator {
private AtomicInteger chunkID = new AtomicInteger(1);
// maps the chunk against the monotonically increasing chunk id. We need to preserve the
// natural ordering of the key
// CellChunkMap creation should convert the soft ref to hard reference
private Map<Integer, SoftReference<Chunk>> chunkIdMap =
new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
// CellChunkMap creation should convert the weak ref to hard reference
// chunk id of each chunk is the first integer written on each chunk,
// the header size need to be changed in case chunk id size is changed
public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
// An object pointed by a weak reference can be garbage collected, in opposite to an object
// referenced by a strong (regular) reference. Every chunk created via ChunkCreator is referenced
// from either weakChunkIdMap or strongChunkIdMap.
// Upon chunk C creation, C's ID is mapped into weak reference to C, in order not to disturb C's
// GC in case all other reference to C are going to be removed.
// When chunk C is referenced from CellChunkMap (via C's ID) it is possible to GC the chunk C.
// To avoid that upon inserting C into CellChunkMap, C's ID is mapped into strong (regular)
// reference to C.
// map that doesn't influence GC
private Map<Integer, WeakReference<Chunk>> weakChunkIdMap =
new ConcurrentHashMap<Integer, WeakReference<Chunk>>();
// map that keeps chunks from garbage collection
private Map<Integer, Chunk> strongChunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
private final int chunkSize;
private final boolean offheap;
@VisibleForTesting
@ -119,8 +139,8 @@ public class ChunkCreator {
if (chunk == null) {
chunk = createChunk();
}
// put this chunk into the chunkIdMap
this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
// put this chunk initially into the weakChunkIdMap
this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk));
// now we need to actually do the expensive memory allocation step in case of a new chunk,
// else only the offset is set to the beginning of the chunk to accept allocations
chunk.init();
@ -148,12 +168,36 @@ public class ChunkCreator {
}
@VisibleForTesting
// TODO : To be used by CellChunkMap
// Used to translate the ChunkID into a chunk ref
Chunk getChunk(int id) {
SoftReference<Chunk> ref = chunkIdMap.get(id);
WeakReference<Chunk> ref = weakChunkIdMap.get(id);
if (ref != null) {
return ref.get();
}
// check also the strong mapping
return strongChunkIdMap.get(id);
}
// transfer the weak pointer to be a strong chunk pointer
Chunk saveChunkFromGC(int chunkID) {
Chunk c = strongChunkIdMap.get(chunkID); // check whether the chunk is already protected
if (c != null) // with strong pointer
return c;
WeakReference<Chunk> ref = weakChunkIdMap.get(chunkID);
if (ref != null) {
c = ref.get();
}
if (c != null) {
// put this strong reference to chunk into the strongChunkIdMap
// the read of the weakMap is always happening before the read of the strongMap
// so no synchronization issues here
this.strongChunkIdMap.put(chunkID, c);
this.weakChunkIdMap.remove(chunkID);
return c;
}
// we should actually never return null as someone should not ask to save from GC a chunk,
// which is already released. However, we are not asserting it here and we let the caller
// to deal with the return value an assert if needed
return null;
}
@ -166,25 +210,30 @@ public class ChunkCreator {
}
private void removeChunks(Set<Integer> chunkIDs) {
this.chunkIdMap.keySet().removeAll(chunkIDs);
this.weakChunkIdMap.keySet().removeAll(chunkIDs);
this.strongChunkIdMap.keySet().removeAll(chunkIDs);
}
Chunk removeChunk(int chunkId) {
SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
if (ref != null) {
return ref.get();
WeakReference<Chunk> weak = this.weakChunkIdMap.remove(chunkId);
Chunk strong = this.strongChunkIdMap.remove(chunkId);
if (weak != null) {
return weak.get();
}
return null;
return strong;
}
@VisibleForTesting
// the chunks in the weakChunkIdMap may already be released so we shouldn't relay
// on this counting for strong correctness. This method is used only in testing.
int size() {
return this.chunkIdMap.size();
return this.weakChunkIdMap.size()+this.strongChunkIdMap.size();
}
@VisibleForTesting
void clearChunkIds() {
this.chunkIdMap.clear();
this.strongChunkIdMap.clear();
this.weakChunkIdMap.clear();
}
/**

View File

@ -57,6 +57,12 @@ public class CompactingMemStore extends AbstractMemStore {
"hbase.hregion.compacting.memstore.type";
public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
String.valueOf(MemoryCompactionPolicy.BASIC);
// The external setting of the compacting MemStore behaviour
public static final String COMPACTING_MEMSTORE_INDEX_KEY =
"hbase.hregion.compacting.memstore.index";
// usage of CellArrayMap is default, later it will be decided how to use CellChunkMap
public static final String COMPACTING_MEMSTORE_INDEX_DEFAULT =
String.valueOf(IndexType.ARRAY_MAP);
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor";
@ -78,10 +84,22 @@ public class CompactingMemStore extends AbstractMemStore {
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true;
/**
* Types of indexes (part of immutable segments) to be used after flattening,
* compaction, or merge are applied.
*/
public enum IndexType {
CSLM_MAP, // ConcurrentSkipLisMap
ARRAY_MAP, // CellArrayMap
CHUNK_MAP // CellChunkMap
}
private IndexType indexType = IndexType.ARRAY_MAP; // default implementation
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+ 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
// indexType
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
+ 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
@ -96,6 +114,8 @@ public class CompactingMemStore extends AbstractMemStore {
this.pipeline = new CompactionPipeline(getRegionServices());
this.compactor = createMemStoreCompactor(compactionPolicy);
initInmemoryFlushSize(conf);
indexType = IndexType.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
}
@VisibleForTesting
@ -294,7 +314,19 @@ public class CompactingMemStore extends AbstractMemStore {
* The flattening happens only if versions match.
*/
public void flattenOneSegment(long requesterVersion) {
pipeline.flattenYoungestSegment(requesterVersion);
pipeline.flattenOneSegment(requesterVersion, indexType);
}
// setter is used only for testability
@VisibleForTesting
public void setIndexType() {
indexType = IndexType.valueOf(getConfiguration().get(
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
}
public IndexType getIndexType() {
return indexType;
}
public boolean hasImmutableSegments() {

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
* method accesses the read-only copy more than once it makes a local copy of it
* to ensure it accesses the same copy.
*
* The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
* The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
* protected by a lock since they need to have a consistent (atomic) view of the pipeline list
* and version number.
*/
@ -183,7 +183,7 @@ public class CompactionPipeline {
*
* @return true iff a segment was successfully flattened
*/
public boolean flattenYoungestSegment(long requesterVersion) {
public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) {
if(requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
@ -196,17 +196,22 @@ public class CompactionPipeline {
LOG.warn("Segment flattening failed, because versions do not match");
return false;
}
int i = 0;
for (ImmutableSegment s : pipeline) {
// remember the old size in case this segment is going to be flatten
MemstoreSize memstoreSize = new MemstoreSize();
if (s.flatten(memstoreSize)) {
if ( s.canBeFlattened() ) {
MemstoreSize newMemstoreSize = new MemstoreSize(); // the size to be updated
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreSize);
replaceAtIndex(i,newS);
if(region != null) {
region.addMemstoreSize(memstoreSize);
// update the global memstore size counter
// upon flattening there is no change in the data size
region.addMemstoreSize(new MemstoreSize(0, newMemstoreSize.getHeapSize()));
}
LOG.debug("Compaction pipeline segment " + s + " was flattened");
return true;
}
i++;
}
}
@ -271,6 +276,13 @@ public class CompactionPipeline {
if(segment != null) pipeline.addLast(segment);
}
// replacing one segment in the pipeline with a new one exactly at the same index
// need to be called only within synchronized block
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
pipeline.set(idx, newSegment);
readOnlyCopy = new LinkedList<>(pipeline);
}
public Segment getTail() {
List<? extends Segment> localCopy = getSegments();
if(localCopy.isEmpty()) {

View File

@ -184,6 +184,16 @@ public class CompositeImmutableSegment extends ImmutableSegment {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
@Override
protected long indexEntrySize() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
@Override protected boolean canBeFlattened() {
return false;
}
/**
* @return Sum of all cell sizes.
*/

View File

@ -45,6 +45,15 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
throw new IllegalStateException("This is an Immutable MemStoreLAB.");
}
@Override
// returning a new chunk, without replacing current chunk,
// the space on this chunk will be allocated externally
// use the first MemStoreLABImpl in the list
public Chunk getNewExternalChunk() {
MemStoreLAB mslab = this.mslabs.get(0);
return mslab.getNewExternalChunk();
}
@Override
public void close() {
// 'openScannerCount' here tracks the scanners opened on segments which directly refer to this

View File

@ -19,15 +19,12 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -37,21 +34,11 @@ import java.util.List;
* and is not needed for a {@link MutableSegment}.
*/
@InterfaceAudience.Private
public class ImmutableSegment extends Segment {
public abstract class ImmutableSegment extends Segment {
private static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+ (2 * ClassSize.REFERENCE) // Refs to timeRange and type
+ ClassSize.TIMERANGE;
public static final long DEEP_OVERHEAD_CSLM = DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
/**
* Types of ImmutableSegment
*/
public enum Type {
SKIPLIST_MAP_BASED,
ARRAY_MAP_BASED,
}
public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+ ClassSize.align(ClassSize.REFERENCE // Referent to timeRange
+ ClassSize.TIMERANGE);
/**
* This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
@ -59,12 +46,8 @@ public class ImmutableSegment extends Segment {
*/
private final TimeRange timeRange;
private Type type = Type.SKIPLIST_MAP_BASED;
// whether it is based on CellFlatMap or ConcurrentSkipListMap
private boolean isFlat(){
return (type != Type.SKIPLIST_MAP_BASED);
}
// each sub-type of immutable segment knows whether it is flat or not
protected abstract boolean canBeFlattened();
///////////////////// CONSTRUCTORS /////////////////////
/**------------------------------------------------------------------------
@ -76,59 +59,25 @@ public class ImmutableSegment extends Segment {
}
/**------------------------------------------------------------------------
* Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
* C-tor to be used to build the derived classes
*/
protected ImmutableSegment(CellSet cs, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cs, comparator, memStoreLAB);
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
/**------------------------------------------------------------------------
* Copy C-tor to be used when new CSLMImmutableSegment (derived) is being built from a Mutable one.
* This C-tor should be used when active MutableSegment is pushed into the compaction
* pipeline and becomes an ImmutableSegment.
*/
protected ImmutableSegment(Segment segment) {
super(segment);
this.type = Type.SKIPLIST_MAP_BASED;
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
/**------------------------------------------------------------------------
* C-tor to be used when new CELL_ARRAY BASED ImmutableSegment is a result of compaction of a
* list of older ImmutableSegments.
* The given iterator returns the Cells that "survived" the compaction.
* The input parameter "type" exists for future use when more types of flat ImmutableSegments
* are going to be introduced.
*/
protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) {
super(null, // initiailize the CellSet with NULL
comparator, memStoreLAB);
this.type = type;
// build the new CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
this.setCellSet(null, cs); // update the CellSet of the new Segment
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
/**------------------------------------------------------------------------
* C-tor to be used when new SKIP-LIST BASED ImmutableSegment is a result of compaction of a
* list of older ImmutableSegments.
* The given iterator returns the Cells that "survived" the compaction.
*/
protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB) {
super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
comparator, memStoreLAB);
type = Type.SKIPLIST_MAP_BASED;
while (iterator.hasNext()) {
Cell c = iterator.next();
// The scanner is doing all the elimination logic
// now we just copy it to the new segment
Cell newKV = maybeCloneWithAllocator(c);
boolean usedMSLAB = (newKV != c);
internalAdd(newKV, usedMSLAB, null);
}
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
///////////////////// PUBLIC METHODS /////////////////////
@Override
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
return this.timeRange.includesTimeRange(scan.getTimeRange()) &&
@ -148,107 +97,4 @@ public class ImmutableSegment extends Segment {
List<Segment> res = new ArrayList<>(Arrays.asList(this));
return res;
}
/**------------------------------------------------------------------------
* Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
* based on CellArrayMap.
* If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP
*
* Synchronization of the CellSet replacement:
* The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment
* is constructed (single thread) or flattened. The flattening happens as part of a single
* thread of compaction, but to be on the safe side the initial CellSet is locally saved
* before the flattening and then replaced using CAS instruction.
*/
public boolean flatten(MemstoreSize memstoreSize) {
if (isFlat()) return false;
CellSet oldCellSet = getCellSet();
int numOfCells = getCellsCount();
// build the new (CellSet CellArrayMap based)
CellSet newCellSet = recreateCellArrayMapSet(numOfCells);
type = Type.ARRAY_MAP_BASED;
setCellSet(oldCellSet,newCellSet);
// arrange the meta-data size, decrease all meta-data sizes related to SkipList
// (recreateCellArrayMapSet doesn't take the care for the sizes)
long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
// add size of CellArrayMap and meta-data overhead per Cell
newSegmentSizeDelta = newSegmentSizeDelta + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
incSize(0, newSegmentSizeDelta);
if (memstoreSize != null) {
memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
}
return true;
}
///////////////////// PRIVATE METHODS /////////////////////
/*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from compacting iterator
private CellSet createCellArrayMapSet(int numOfCells, MemStoreSegmentsIterator iterator,
boolean merge) {
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
int i = 0;
while (iterator.hasNext()) {
Cell c = iterator.next();
// The scanner behind the iterator is doing all the elimination logic
if (merge) {
// if this is merge we just move the Cell object without copying MSLAB
// the sizes still need to be updated in the new segment
cells[i] = c;
} else {
// now we just copy it to the new segment (also MSLAB copy)
cells[i] = maybeCloneWithAllocator(c);
}
boolean useMSLAB = (getMemStoreLAB()!=null);
// second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
i++;
}
// build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
return new CellSet(cam);
}
@Override
protected long heapSizeChange(Cell cell, boolean succ) {
if (succ) {
switch (this.type) {
case SKIPLIST_MAP_BASED:
return super.heapSizeChange(cell, succ);
case ARRAY_MAP_BASED:
return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
}
}
return 0;
}
/*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet
// (without compacting iterator)
private CellSet recreateCellArrayMapSet(int numOfCells) {
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
Cell curCell;
int idx = 0;
// create this segment scanner with maximal possible read point, to go over all Cells
KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
try {
while ((curCell = segmentScanner.next()) != null) {
cells[idx++] = curCell;
}
} catch (IOException ie) {
throw new IllegalStateException(ie);
} finally {
segmentScanner.close();
}
// build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
return new CellSet(cam);
}
}

View File

@ -80,7 +80,7 @@ public class MemStoreCompactor {
* Note that every value covers the previous ones, i.e. if MERGE is the action it implies
* that the youngest segment is going to be flatten anyway.
*/
private enum Action {
public enum Action {
NOOP,
FLATTEN, // flatten the youngest segment in the pipeline
MERGE, // merge all the segments in the pipeline into one
@ -160,7 +160,7 @@ public class MemStoreCompactor {
if (action == Action.COMPACT) { // compact according to the user request
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be compacted, number of"
+ " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of"
+ " cells before compaction is " + versionedList.getNumOfCells());
return Action.COMPACT;
}
@ -170,13 +170,15 @@ public class MemStoreCompactor {
int numOfSegments = versionedList.getNumOfSegments();
if (numOfSegments > pipelineThreshold) {
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be merged, as there are " + numOfSegments + " segments");
+ " is going to be merged to the " + compactingMemStore.getIndexType()
+ ", as there are " + numOfSegments + " segments");
return Action.MERGE; // to avoid too many segments, merge now
}
// if nothing of the above, then just flatten the newly joined segment
LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
+ compactingMemStore.getFamilyName() + " is going to be flattened");
+ compactingMemStore.getFamilyName() + " is going to be flattened to the "
+ compactingMemStore.getIndexType());
return Action.FLATTEN;
}
@ -252,7 +254,7 @@ public class MemStoreCompactor {
result = SegmentFactory.instance().createImmutableSegmentByCompaction(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED);
versionedList.getNumOfCells(), compactingMemStore.getIndexType());
iterator.close();
break;
case MERGE:
@ -263,8 +265,8 @@ public class MemStoreCompactor {
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED,
versionedList.getStoreSegments());
versionedList.getNumOfCells(), versionedList.getStoreSegments(),
compactingMemStore.getIndexType());
iterator.close();
break;
default: throw new RuntimeException("Unknown action " + action); // sanity check

View File

@ -83,6 +83,12 @@ public interface MemStoreLAB {
*/
void decScannerCount();
/**
* Return a new empty chunk without considering this chunk as current
* The space on this chunk will be allocated externally
*/
Chunk getNewExternalChunk();
public static MemStoreLAB newInstance(Configuration conf) {
MemStoreLAB memStoreLAB = null;
if (isEnabled(conf)) {

View File

@ -253,6 +253,17 @@ public class MemStoreLABImpl implements MemStoreLAB {
return null;
}
// Returning a new chunk, without replacing current chunk,
// meaning MSLABImpl does not make the returned chunk as CurChunk.
// The space on this chunk will be allocated externally
// The interface is only for external callers
@Override
public Chunk getNewExternalChunk() {
Chunk c = this.chunkCreator.getChunk();
chunks.add(c.getId());
return c;
}
@VisibleForTesting
Chunk getCurrentChunk() {
return this.curChunk.get();

View File

@ -42,6 +42,7 @@ public class MutableSegment extends Segment {
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cellSet, comparator, memStoreLAB);
incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata
}
/**
@ -121,4 +122,8 @@ public class MutableSegment extends Segment {
public long getMinTimestamp() {
return this.timeRangeTracker.getMin();
}
@Override protected long indexEntrySize() {
return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY;
}
}

View File

@ -305,6 +305,10 @@ public abstract class Segment {
}
}
protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemstoreSize memstoreSize) {
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSize);
}
/**
* @return The increase in heap size because of this cell addition. This includes this cell POJO's
* heap size itself and additional overhead because of addition on to CSLM.
@ -312,11 +316,13 @@ public abstract class Segment {
protected long heapSizeChange(Cell cell, boolean succ) {
if (succ) {
return ClassSize
.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
.align(indexEntrySize() + CellUtil.estimatedHeapSizeOf(cell));
}
return 0;
}
protected abstract long indexEntrySize();
/**
* Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -41,42 +40,36 @@ public final class SegmentFactory {
return instance;
}
// create skip-list-based (non-flat) immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator) {
return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
}
// create composite immutable segment from a list of segments
// for snapshot consisting of multiple segments
public CompositeImmutableSegment createCompositeImmutableSegment(
final CellComparator comparator, List<ImmutableSegment> segments) {
return new CompositeImmutableSegment(comparator, segments);
}
// create new flat immutable segment from compacting old immutable segments
// for compaction
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
ImmutableSegment.Type segmentType)
CompactingMemStore.IndexType idxType)
throws IOException {
Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
"wrong immutable segment type");
MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return
// the last parameter "false" means not to merge, but to compact the pipeline
// in order to create the new segment
new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false);
createImmutableSegment(
conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType);
}
// create empty immutable segment
// for initializations
public ImmutableSegment createImmutableSegment(CellComparator comparator) {
MutableSegment segment = generateMutableSegment(null, comparator, null);
return createImmutableSegment(segment);
}
// create immutable segment from mutable segment
// create not-flat immutable segment from mutable segment
public ImmutableSegment createImmutableSegment(MutableSegment segment) {
return new ImmutableSegment(segment);
return new CSLMImmutableSegment(segment);
}
// create mutable segment
@ -86,19 +79,58 @@ public final class SegmentFactory {
}
// create new flat immutable segment from merging old immutable segments
// for merge
public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
ImmutableSegment.Type segmentType, List<ImmutableSegment> segments)
List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType)
throws IOException {
Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
"wrong immutable segment type");
MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
return
// the last parameter "true" means to merge the compaction pipeline
// in order to create the new segment
new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true);
createImmutableSegment(
conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType);
}
// create flat immutable segment from non-flat immutable segment
// for flattening
public ImmutableSegment createImmutableSegmentByFlattening(
CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemstoreSize memstoreSize) {
ImmutableSegment res = null;
switch (idxType) {
case CHUNK_MAP:
res = new CellChunkImmutableSegment(segment, memstoreSize);
break;
case CSLM_MAP:
assert false; // non-flat segment can not be the result of flattening
break;
case ARRAY_MAP:
res = new CellArrayImmutableSegment(segment, memstoreSize);
break;
}
return res;
}
//****** private methods to instantiate concrete store segments **********//
private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells,
MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) {
ImmutableSegment res = null;
switch (idxType) {
case CHUNK_MAP:
res = new CellChunkImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
break;
case CSLM_MAP:
assert false; // non-flat segment can not be created here
break;
case ARRAY_MAP:
res = new CellArrayImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
break;
}
return res;
}
private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
MemStoreLAB memStoreLAB) {

View File

@ -369,6 +369,7 @@ public class TestHeapSize {
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
@ -376,9 +377,28 @@ public class TestHeapSize {
assertEquals(expected, actual);
}
// ImmutableSegment Deep overhead
// ImmutableSegments Deep overhead
cl = ImmutableSegment.class;
actual = ImmutableSegment.DEEP_OVERHEAD_CSLM;
actual = ImmutableSegment.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
assertEquals(expected, actual);
}
cl = CSLMImmutableSegment.class;
actual = CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
expected = ClassSize.estimateBase(cl, false);
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
@ -389,6 +409,7 @@ public class TestHeapSize {
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
@ -396,7 +417,8 @@ public class TestHeapSize {
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual);
}
actual = ImmutableSegment.DEEP_OVERHEAD_CAM;
cl = CellArrayImmutableSegment.class;
actual = CellArrayImmutableSegment.DEEP_OVERHEAD_CAM;
expected = ClassSize.estimateBase(cl, false);
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
@ -407,6 +429,7 @@ public class TestHeapSize {
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -286,8 +287,8 @@ public class TestCellFlatSet extends TestCase {
ByteBuffer idxBuffer = idxChunk.getData(); // the buffers of the chunks
ByteBuffer dataBuffer = dataChunk.getData();
int dataOffset = Bytes.SIZEOF_INT; // offset inside data buffer
int idxOffset = Bytes.SIZEOF_INT; // skip the space for chunk ID
int dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // offset inside data buffer
int idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // skip the space for chunk ID
Cell[] cellArray = asc ? ascCells : descCells;
@ -296,16 +297,16 @@ public class TestCellFlatSet extends TestCase {
if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) {
dataChunk = chunkCreator.getChunk(); // allocate more data chunks if needed
dataBuffer = dataChunk.getData();
dataOffset = Bytes.SIZEOF_INT;
dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
}
int dataStartOfset = dataOffset;
dataOffset = KeyValueUtil.appendTo(kv, dataBuffer, dataOffset, false); // write deep cell data
// do we have enough space to write the cell-representation on the index chunk?
if (idxOffset + CellChunkMap.SIZEOF_CELL_REP > chunkCreator.getChunkSize()) {
if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) {
idxChunk = chunkCreator.getChunk(); // allocate more index chunks if needed
idxBuffer = idxChunk.getData();
idxOffset = Bytes.SIZEOF_INT;
idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
chunkArray[chunkArrayIdx++] = idxChunk;
}
idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataChunk.getId()); // write data chunk id
@ -314,8 +315,6 @@ public class TestCellFlatSet extends TestCase {
idxOffset = ByteBufferUtils.putLong(idxBuffer, idxOffset, kv.getSequenceId()); // seqId
}
return asc ?
new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,false) :
new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,true);
return new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,!asc);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
@ -98,7 +99,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
assertTrue(chunkCreator != null);
}
@ -563,16 +564,71 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
public void testFlatteningToCellChunkMap() throws IOException {
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 8;
String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(keys1[0] + 0);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
int totalCellsLen = addRowsByKeys(memstore, keys1);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
.length(kv));
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
//////////////////////////////////////////////////////////////////////////////
// Compaction tests
//////////////////////////////////////////////////////////////////////////////
@Test
public void testCompaction1Bucket() throws IOException {
// set memstore to do data compaction and not to use the speculative scan
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
// set memstore to do basic structure flattening, the "eager" option is tested in
// TestCompactingToCellFlatMapMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration()
.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
@ -581,25 +637,24 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
int totalCellsLen = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// There is no compaction, as the compacting memstore type is basic.
// totalCellsLen remains the same
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 4 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen = (totalCellsLen * 3) / 4;
totalHeapSize = 3 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(3, s.getCellsCount());
assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
@ -608,8 +663,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Test
public void testCompaction2Buckets() throws IOException {
// set memstore to do data compaction and not to use the speculative scan
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
// set memstore to do basic structure flattening, the "eager" option is tested in
// TestCompactingToCellFlatMapMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
@ -619,24 +675,23 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
int totalCellsLen1 = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
int counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(3, counter);
assertEquals(4, counter);
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
// totalCellsLen
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
// There is no compaction, as the compacting memstore type is basic.
// totalCellsLen remains the same
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
totalHeapSize = 3 * oneCellOnCAHeapSize;
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 4 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
int totalCellsLen2 = addRowsByKeys(memstore, keys2);
@ -644,19 +699,19 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
totalHeapSize = 4 * oneCellOnCAHeapSize;
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 7 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount());
assertEquals(7, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
@ -678,10 +733,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
assertEquals(totalCellsLen1, region.getMemstoreSize());
long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
@ -690,29 +743,31 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
// In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
totalHeapSize = 3 * oneCellOnCAHeapSize;
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 3 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).disableCompaction();
size = memstore.getFlushableSize();
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(0, memstore.getSnapshot().getCellsCount());
// No change in the cells data size. ie. memstore size. as there is no compaction.
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
((CompactingMemStore) memstore).heapSize());
int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize());
long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize;
assertEquals(totalHeapSize + totalHeapSize2 + totalHeapSize3,
((CompactingMemStore) memstore).heapSize());
long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 3 * oneCellOnCSLMHeapSize;
assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore)memstore).enableCompaction();
size = memstore.getFlushableSize();
@ -725,7 +780,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize());
// Only 4 unique cells left
assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize());
assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
+ CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot

View File

@ -1,492 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.List;
/**
* compacted memstore test case
*/
@Category({RegionServerTests.class, MediumTests.class})
public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
@Override public void tearDown() throws Exception {
chunkCreator.clearChunksInPool();
}
@Override public void setUp() throws Exception {
compactingSetUp();
Configuration conf = HBaseConfiguration.create();
// set memstore to do data compaction
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.EAGER));
this.memstore =
new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
regionServicesForStores, MemoryCompactionPolicy.EAGER);
}
//////////////////////////////////////////////////////////////////////////////
// Compaction tests
//////////////////////////////////////////////////////////////////////////////
public void testCompaction1Bucket() throws IOException {
int counter = 0;
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
// test 1 bucket
long totalCellsLen = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
assertEquals(4, memstore.getActive().getCellsCount());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen = (totalCellsLen * 3) / 4;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
totalHeapSize = 3 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(3, counter);
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(3, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
public void testCompaction2Buckets() throws IOException {
String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" };
long totalCellsLen1 = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
int counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(3,counter);
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
totalHeapSize1 = 3 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen2 = addRowsByKeys(memstore, keys2);
long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(4,counter);
totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
totalHeapSize2 = 1 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
public void testCompaction3Buckets() throws IOException {
String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" };
String[] keys3 = { "D", "B", "B" };
long totalCellsLen1 = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1, region.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
totalHeapSize1 = 3 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen2 = addRowsByKeys(memstore, keys2);
long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).disableCompaction();
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen3 = addRowsByKeys(memstore, keys3);
long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).enableCompaction();
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
// active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
// Out of total 10, only 4 cells are unique
totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
totalCellsLen3 = 0;// All duplicated cells.
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize());
// Only 4 unique cells left
assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
//////////////////////////////////////////////////////////////////////////////
// Merging tests
//////////////////////////////////////////////////////////////////////////////
@Test
public void testMerging() throws IOException {
String[] keys1 = { "A", "A", "B", "C", "F", "H"};
String[] keys2 = { "A", "B", "D", "G", "I", "J"};
String[] keys3 = { "D", "B", "B", "E" };
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
addRowsByKeys(memstore, keys1);
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
addRowsByKeys(memstore, keys2); // also should only flatten
int counter2 = 0;
for ( Segment s : memstore.getSegments()) {
counter2 += s.getCellsCount();
}
assertEquals(12, counter2);
((CompactingMemStore) memstore).disableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
assertEquals(0, memstore.getSnapshot().getCellsCount());
int counter3 = 0;
for ( Segment s : memstore.getSegments()) {
counter3 += s.getCellsCount();
}
assertEquals(12, counter3);
addRowsByKeys(memstore, keys3);
int counter4 = 0;
for ( Segment s : memstore.getSegments()) {
counter4 += s.getCellsCount();
}
assertEquals(16, counter4);
((CompactingMemStore) memstore).enableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
int counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(16,counter);
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
ImmutableSegment s = memstore.getSnapshot();
memstore.clearSnapshot(snapshot.getId());
}
@Test
public void testCountOfCellsAfterFlatteningByScan() throws IOException {
String[] keys1 = { "A", "B", "C" }; // A, B, C
addRowsByKeysWith50Cols(memstore, keys1);
// this should only flatten as there are no duplicates
((CompactingMemStore) memstore).flushInMemory();
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
// seek
int count = 0;
for(int i = 0; i < scanners.size(); i++) {
scanners.get(i).seek(KeyValue.LOWESTKEY);
while (scanners.get(i).next() != null) {
count++;
}
}
assertEquals("the count should be ", count, 150);
for(int i = 0; i < scanners.size(); i++) {
scanners.get(i).close();
}
}
@Test
public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
String[] keys1 = { "A", "B", "C" }; // A, B, C
addRowsByKeysWith50Cols(memstore, keys1);
// this should only flatten as there are no duplicates
((CompactingMemStore) memstore).flushInMemory();
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
// Just doing the cnt operation here
MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
CellComparator.COMPARATOR, 10);
int cnt = 0;
try {
while (itr.next() != null) {
cnt++;
}
} finally {
itr.close();
}
assertEquals("the count should be ", cnt, 150);
}
private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]);
for(int j =0 ;j < 50; j++) {
byte[] qf = Bytes.toBytes("testqualifier"+j);
byte[] val = Bytes.toBytes(keys[i] + j);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
hmc.add(kv, null);
}
}
}
@Override
@Test
public void testPuttingBackChunksWithOpeningScanner() throws IOException {
byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1");
byte[] qf2 = Bytes.toBytes("testqualifier2");
byte[] qf3 = Bytes.toBytes("testqualifier3");
byte[] qf4 = Bytes.toBytes("testqualifier4");
byte[] qf5 = Bytes.toBytes("testqualifier5");
byte[] qf6 = Bytes.toBytes("testqualifier6");
byte[] qf7 = Bytes.toBytes("testqualifier7");
byte[] val = Bytes.toBytes("testval");
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val), null);
memstore.add(new KeyValue(row, fam, qf2, val), null);
memstore.add(new KeyValue(row, fam, qf3, val), null);
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore
assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// opening scanner before clear the snapshot
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
chunkCreator.clearChunksInPool();
// Creating another snapshot
snapshot = memstore.snapshot();
// Adding more value
memstore.add(new KeyValue(row, fam, qf6, val), null);
memstore.add(new KeyValue(row, fam, qf7, val), null);
// opening scanners
scanners = memstore.getScanners(0);
// close scanners before clear the snapshot
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
public void testPuttingBackChunksAfterFlushing() throws IOException {
byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1");
byte[] qf2 = Bytes.toBytes("testqualifier2");
byte[] qf3 = Bytes.toBytes("testqualifier3");
byte[] qf4 = Bytes.toBytes("testqualifier4");
byte[] qf5 = Bytes.toBytes("testqualifier5");
byte[] val = Bytes.toBytes("testval");
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val), null);
memstore.add(new KeyValue(row, fam, qf2, val), null);
memstore.add(new KeyValue(row, fam, qf3, val), null);
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore
assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
MemstoreSize memstoreSize = new MemstoreSize();
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]);
byte[] val = Bytes.toBytes(keys[i] + i);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
hmc.add(kv, memstoreSize);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
}
regionServicesForStores.addMemstoreSize(memstoreSize);
return memstoreSize.getDataSize();
}
}

View File

@ -965,9 +965,12 @@ public class TestDefaultMemStore {
conf, FSTableDescriptors.createMetaTableDescriptor(conf),
wFactory.getMetaWAL(HRegionInfo.FIRST_META_REGIONINFO.
getEncodedNameAsBytes()));
HRegionInfo hri = new HRegionInfo(TableName.valueOf(name.getMethodName()),
// parameterized tests add [#] suffix get rid of [ and ].
HRegionInfo hri =
new HRegionInfo(TableName.valueOf(name.getMethodName().replaceAll("[\\[\\]]", "_")),
Bytes.toBytes("row_0200"), Bytes.toBytes("row_0300"));
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(
name.getMethodName().replaceAll("[\\[\\]]", "_")));
desc.addFamily(new HColumnDescriptor("foo".getBytes()));
HRegion r =
HRegion.createHRegion(hri, testDir, conf, desc,

View File

@ -787,7 +787,8 @@ public class TestHRegionReplayEvents {
Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize();
if (droppableMemstore) {
assertTrue(newFlushableSize == 0); // assert that the memstore is dropped
// assert that the memstore is dropped
assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD);
} else {
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
}
@ -877,7 +878,7 @@ public class TestHRegionReplayEvents {
}
Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize();
assertTrue(newFlushableSize == 0);
assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD);
// assert that the region memstore is empty
long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();

View File

@ -194,7 +194,7 @@ public class TestPerColumnFamilyFlush {
// We should have cleared out only CF1, since we chose the flush thresholds
// and number of puts accordingly.
assertEquals(0, cf1MemstoreSize.getDataSize());
assertEquals(0, cf1MemstoreSize.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
// Nothing should have happened to CF2, ...
assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
// ... or CF3
@ -231,9 +231,9 @@ public class TestPerColumnFamilyFlush {
// CF1 and CF2, both should be absent.
assertEquals(0, cf1MemstoreSize.getDataSize());
assertEquals(0, cf1MemstoreSize.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
assertEquals(0, cf2MemstoreSize.getDataSize());
assertEquals(0, cf2MemstoreSize.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
// CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
@ -314,11 +314,11 @@ public class TestPerColumnFamilyFlush {
// Everything should have been cleared
assertEquals(0, cf1MemstoreSize.getDataSize());
assertEquals(0, cf1MemstoreSize.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
assertEquals(0, cf2MemstoreSize.getDataSize());
assertEquals(0, cf2MemstoreSize.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
assertEquals(0, cf3MemstoreSize.getDataSize());
assertEquals(0, cf3MemstoreSize.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
assertEquals(0, totalMemstoreSize);
assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
HBaseTestingUtility.closeRegionAndWAL(region);
@ -525,9 +525,9 @@ public class TestPerColumnFamilyFlush {
});
LOG.info("Finished waiting on flush after too many WALs...");
// Individual families should have been flushed.
assertEquals(0, desiredRegion.getStore(FAMILY1).getMemStoreSize());
assertEquals(0, desiredRegion.getStore(FAMILY2).getMemStoreSize());
assertEquals(0, desiredRegion.getStore(FAMILY3).getMemStoreSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize());
// let WAL cleanOldLogs
assertNull(getWAL(desiredRegion).rollWriter(true));
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);

View File

@ -252,6 +252,8 @@ public class TestStore {
LOG.info("Adding some data");
MemstoreSize kvSize = new MemstoreSize();
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
// add the heap size of active (mutable) segment
kvSize.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD);
size = store.memstore.getFlushableSize();
Assert.assertEquals(kvSize, size);
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
@ -262,10 +264,14 @@ public class TestStore {
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
}
// due to snapshot, change mutable to immutable segment
kvSize.incMemstoreSize(0,
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD);
size = store.memstore.getFlushableSize();
Assert.assertEquals(kvSize, size);
MemstoreSize kvSize2 = new MemstoreSize();
store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
kvSize2.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD);
// Even though we add a new kv, we expect the flushable size to be 'same' since we have
// not yet cleared the snapshot -- the above flush failed.
Assert.assertEquals(kvSize, size);
@ -277,7 +283,7 @@ public class TestStore {
flushStore(store, id++);
size = store.memstore.getFlushableSize();
assertEquals(0, size.getDataSize());
assertEquals(0, size.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, size.getHeapSize());
return null;
}
});

View File

@ -241,7 +241,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should become empty
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
// verify that CF3 was flushed to memory and was compacted (this is approximation check)
assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
@ -302,7 +302,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should be flushed to disk
assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
// CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
@ -326,11 +326,11 @@ public class TestWalAndCompactingMemStoreFlush {
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf1MemstoreSizePhaseV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf3MemstoreSizePhaseV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
// What happens when we hit the memstore limit, but we are not able to find
// any Column Family above the threshold?
@ -476,7 +476,7 @@ public class TestWalAndCompactingMemStoreFlush {
assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
// CF2 should become empty
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
// verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
// if compacted CF# should be at least twice less because its every key was duplicated
assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
@ -544,7 +544,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should remain empty
assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
// CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
// the smallest LSN of CF3 shouldn't change
@ -573,11 +573,11 @@ public class TestWalAndCompactingMemStoreFlush {
/*------------------------------------------------------------------------------*/
/* PHASE V - validation */
assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf1MemstoreSizePhaseV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf3MemstoreSizePhaseV.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
// The total memstores size should be empty
assertEquals(0, totalMemstoreSizePhaseV);
// Because there is nothing in any memstore the WAL's LSN should be -1
@ -699,7 +699,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should have been cleared
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
String s = "\n\n----------------------------------\n"
+ "Upon initial insert and flush, LSN of CF1 is:"
@ -875,9 +875,13 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
// the decrease in the heap size due to usage of CellArrayMap instead of CSLM
// should be the same in flattening and in merge (first and second in-memory-flush)
// but in phase 1 we do not yet have immutable segment
assertEquals(
cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize());
cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
- CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
assertEquals(3, // active, one in pipeline, snapshot
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
// CF2 should have been cleared