HBASE-18010: CellChunkMap integration into CompactingMemStore, merge to Release 2.0

This commit is contained in:
anastas 2017-10-01 11:01:10 +03:00
parent 7ee44a820f
commit 95405fbd83
23 changed files with 403 additions and 823 deletions

View File

@ -89,6 +89,15 @@ public class ClassSize {
/** Overhead for ConcurrentSkipListMap Entry */ /** Overhead for ConcurrentSkipListMap Entry */
public static final int CONCURRENT_SKIPLISTMAP_ENTRY; public static final int CONCURRENT_SKIPLISTMAP_ENTRY;
/** Overhead for CellFlatMap */
public static final int CELL_FLAT_MAP;
/** Overhead for CellChunkMap */
public static final int CELL_CHUNK_MAP;
/** Overhead for Cell Chunk Map Entry */
public static final int CELL_CHUNK_MAP_ENTRY;
/** Overhead for CellArrayMap */ /** Overhead for CellArrayMap */
public static final int CELL_ARRAY_MAP; public static final int CELL_ARRAY_MAP;
@ -275,13 +284,17 @@ public class ClassSize {
// The size changes from jdk7 to jdk8, estimate the size rather than use a conditional // The size changes from jdk7 to jdk8, estimate the size rather than use a conditional
CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false); CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false);
// CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
// CellFlatMap class. CellArrayMap object containing a ref to an Array, so
// OBJECT + REFERENCE + ARRAY
// CellFlatMap object contains two integers, one boolean and one reference to object, so // CellFlatMap object contains two integers, one boolean and one reference to object, so
// 2*INT + BOOLEAN + REFERENCE // 2*INT + BOOLEAN + REFERENCE
CELL_ARRAY_MAP = align(OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN CELL_FLAT_MAP = OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN + REFERENCE;
+ ARRAY + 2*REFERENCE);
// CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
// CellFlatMap class. CellArrayMap object containing a ref to an Array of Cells
CELL_ARRAY_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
// CELL_CHUNK_MAP is the size of an instance of CellChunkMap class, which extends
// CellFlatMap class. CellChunkMap object containing a ref to an Array of Chunks
CELL_CHUNK_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
CONCURRENT_SKIPLISTMAP_ENTRY = align( CONCURRENT_SKIPLISTMAP_ENTRY = align(
align(OBJECT + (3 * REFERENCE)) + /* one node per entry */ align(OBJECT + (3 * REFERENCE)) + /* one node per entry */
@ -290,6 +303,12 @@ public class ClassSize {
// REFERENCE in the CellArrayMap all the rest is counted in KeyValue.heapSize() // REFERENCE in the CellArrayMap all the rest is counted in KeyValue.heapSize()
CELL_ARRAY_MAP_ENTRY = align(REFERENCE); CELL_ARRAY_MAP_ENTRY = align(REFERENCE);
// The Cell Representation in the CellChunkMap, the Cell object size shouldn't be counted
// in KeyValue.heapSize()
// each cell-representation requires three integers for chunkID (reference to the ByteBuffer),
// offset and length, and one long for seqID
CELL_CHUNK_MAP_ENTRY = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE)); REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE));
ATOMIC_LONG = align(OBJECT + Bytes.SIZEOF_LONG); ATOMIC_LONG = align(OBJECT + Bytes.SIZEOF_LONG);

View File

@ -21,11 +21,12 @@ package org.apache.hadoop.hbase.regionserver;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ClassSize;
import java.util.Comparator; import java.util.Comparator;
@ -56,11 +57,12 @@ import java.util.Comparator;
public class CellChunkMap extends CellFlatMap { public class CellChunkMap extends CellFlatMap {
private final Chunk[] chunks; // the array of chunks, on which the index is based private final Chunk[] chunks; // the array of chunks, on which the index is based
private final int numOfCellsInsideChunk; // constant number of cell-representations in a chunk
// each cell-representation requires three integers for chunkID (reference to the ByteBuffer), // constant number of cell-representations in a chunk
// offset and length, and one long for seqID // each chunk starts with its own ID following the cells data
public static final int SIZEOF_CELL_REP = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG ; public static final int NUM_OF_CELL_REPS_IN_CHUNK =
(ChunkCreator.getInstance().getChunkSize() - ChunkCreator.SIZEOF_CHUNK_HEADER) /
ClassSize.CELL_CHUNK_MAP_ENTRY;
/** /**
* C-tor for creating CellChunkMap from existing Chunk array, which must be ordered * C-tor for creating CellChunkMap from existing Chunk array, which must be ordered
@ -75,9 +77,6 @@ public class CellChunkMap extends CellFlatMap {
Chunk[] chunks, int min, int max, boolean descending) { Chunk[] chunks, int min, int max, boolean descending) {
super(comparator, min, max, descending); super(comparator, min, max, descending);
this.chunks = chunks; this.chunks = chunks;
this.numOfCellsInsideChunk = // each chunk starts with its own ID following the cells data
(ChunkCreator.getInstance().getChunkSize() - Bytes.SIZEOF_INT) / SIZEOF_CELL_REP;
} }
/* To be used by base (CellFlatMap) class only to create a sub-CellFlatMap /* To be used by base (CellFlatMap) class only to create a sub-CellFlatMap
@ -91,20 +90,20 @@ public class CellChunkMap extends CellFlatMap {
@Override @Override
protected Cell getCell(int i) { protected Cell getCell(int i) {
// get the index of the relevant chunk inside chunk array // get the index of the relevant chunk inside chunk array
int chunkIndex = (i / numOfCellsInsideChunk); int chunkIndex = (i / NUM_OF_CELL_REPS_IN_CHUNK);
ByteBuffer block = chunks[chunkIndex].getData();// get the ByteBuffer of the relevant chunk ByteBuffer block = chunks[chunkIndex].getData();// get the ByteBuffer of the relevant chunk
int j = i - chunkIndex * numOfCellsInsideChunk; // get the index of the cell-representation int j = i - chunkIndex * NUM_OF_CELL_REPS_IN_CHUNK; // get the index of the cell-representation
// find inside the offset inside the chunk holding the index, skip bytes for chunk id // find inside the offset inside the chunk holding the index, skip bytes for chunk id
int offsetInBytes = Bytes.SIZEOF_INT + j* SIZEOF_CELL_REP; int offsetInBytes = ChunkCreator.SIZEOF_CHUNK_HEADER + j* ClassSize.CELL_CHUNK_MAP_ENTRY;
// find the chunk holding the data of the cell, the chunkID is stored first // find the chunk holding the data of the cell, the chunkID is stored first
int chunkId = ByteBufferUtils.toInt(block, offsetInBytes); int chunkId = ByteBufferUtils.toInt(block, offsetInBytes);
Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId); Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId);
if (chunk == null) { if (chunk == null) {
// this should not happen, putting an assertion here at least for the testing period // this should not happen
assert false; throw new IllegalArgumentException("In CellChunkMap, cell must be associated with chunk."
+ ". We were looking for a cell at index " + i);
} }
// find the offset of the data of the cell, skip integer for chunkID, offset is stored second // find the offset of the data of the cell, skip integer for chunkID, offset is stored second
@ -118,8 +117,10 @@ public class CellChunkMap extends CellFlatMap {
ByteBuffer buf = chunk.getData(); // get the ByteBuffer where the cell data is stored ByteBuffer buf = chunk.getData(); // get the ByteBuffer where the cell data is stored
if (buf == null) { if (buf == null) {
// this should not happen, putting an assertion here at least for the testing period // this should not happen
assert false; throw new IllegalArgumentException("In CellChunkMap, chunk must be associated with ByteBuffer."
+ " Chunk: " + chunk + " Chunk ID: " + chunk.getId() + ", is from pool: "
+ chunk.isFromPool() + ". We were looking for a cell at index " + i);
} }
return new ByteBufferChunkCell(buf, offsetOfCell, lengthOfCell, cellSeqID); return new ByteBufferChunkCell(buf, offsetOfCell, lengthOfCell, cellSeqID);

View File

@ -1,4 +1,3 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -18,7 +17,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.lang.ref.SoftReference; import java.lang.ref.WeakReference;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -36,10 +35,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated
@ -51,7 +51,14 @@ public class ChunkCreator {
// monotonically increasing chunkid // monotonically increasing chunkid
private AtomicInteger chunkID = new AtomicInteger(1); private AtomicInteger chunkID = new AtomicInteger(1);
// maps the chunk against the monotonically increasing chunk id. We need to preserve the // maps the chunk against the monotonically increasing chunk id. We need to preserve the
// natural ordering of the key. It also helps to protect from GC. // natural ordering of the key
// CellChunkMap creation should convert the weak ref to hard reference
// chunk id of each chunk is the first integer written on each chunk,
// the header size need to be changed in case chunk id size is changed
public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
// mapping from chunk IDs to chunks
private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>(); private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
private final int chunkSize; private final int chunkSize;
@ -137,10 +144,6 @@ public class ChunkCreator {
return chunk; return chunk;
} }
private Chunk createChunkForPool() {
return createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
}
/** /**
* Creates the chunk either onheap or offheap * Creates the chunk either onheap or offheap
* @param pool indicates if the chunks have to be created which will be used by the Pool * @param pool indicates if the chunks have to be created which will be used by the Pool
@ -226,7 +229,8 @@ public class ChunkCreator {
this.reclaimedChunks = new LinkedBlockingQueue<>(); this.reclaimedChunks = new LinkedBlockingQueue<>();
for (int i = 0; i < initialCount; i++) { for (int i = 0; i < initialCount; i++) {
// Chunks from pool are covered with strong references anyway // Chunks from pool are covered with strong references anyway
Chunk chunk = createChunkForPool(); // TODO: change to CHUNK_MAP if it is generally defined
Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
chunk.init(); chunk.init();
reclaimedChunks.add(chunk); reclaimedChunks.add(chunk);
} }
@ -258,7 +262,8 @@ public class ChunkCreator {
long created = this.chunkCount.get(); long created = this.chunkCount.get();
if (created < this.maxCount) { if (created < this.maxCount) {
if (this.chunkCount.compareAndSet(created, created + 1)) { if (this.chunkCount.compareAndSet(created, created + 1)) {
chunk = createChunkForPool(); // TODO: change to CHUNK_MAP if it is generally defined
chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
break; break;
} }
} else { } else {

View File

@ -314,7 +314,19 @@ public class CompactingMemStore extends AbstractMemStore {
* The flattening happens only if versions match. * The flattening happens only if versions match.
*/ */
public void flattenOneSegment(long requesterVersion) { public void flattenOneSegment(long requesterVersion) {
pipeline.flattenYoungestSegment(requesterVersion); pipeline.flattenOneSegment(requesterVersion, indexType);
}
// setter is used only for testability
@VisibleForTesting
public void setIndexType() {
indexType = IndexType.valueOf(getConfiguration().get(
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
}
public IndexType getIndexType() {
return indexType;
} }
public boolean hasImmutableSegments() { public boolean hasImmutableSegments() {

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
* method accesses the read-only copy more than once it makes a local copy of it * method accesses the read-only copy more than once it makes a local copy of it
* to ensure it accesses the same copy. * to ensure it accesses the same copy.
* *
* The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
* protected by a lock since they need to have a consistent (atomic) view of the pipeline list * protected by a lock since they need to have a consistent (atomic) view of the pipeline list
* and version number. * and version number.
*/ */
@ -183,7 +183,7 @@ public class CompactionPipeline {
* *
* @return true iff a segment was successfully flattened * @return true iff a segment was successfully flattened
*/ */
public boolean flattenYoungestSegment(long requesterVersion) { public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) {
if(requesterVersion != version) { if(requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match. Requester version: " LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
@ -196,17 +196,22 @@ public class CompactionPipeline {
LOG.warn("Segment flattening failed, because versions do not match"); LOG.warn("Segment flattening failed, because versions do not match");
return false; return false;
} }
int i = 0;
for (ImmutableSegment s : pipeline) { for (ImmutableSegment s : pipeline) {
// remember the old size in case this segment is going to be flatten if ( s.canBeFlattened() ) {
MemstoreSize memstoreSize = new MemstoreSize(); MemstoreSize newMemstoreSize = new MemstoreSize(); // the size to be updated
if (s.flatten(memstoreSize)) { ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreSize);
replaceAtIndex(i,newS);
if(region != null) { if(region != null) {
region.addMemstoreSize(memstoreSize); // update the global memstore size counter
// upon flattening there is no change in the data size
region.addMemstoreSize(new MemstoreSize(0, newMemstoreSize.getHeapSize()));
} }
LOG.debug("Compaction pipeline segment " + s + " was flattened"); LOG.debug("Compaction pipeline segment " + s + " was flattened");
return true; return true;
} }
i++;
} }
} }
@ -271,6 +276,13 @@ public class CompactionPipeline {
} }
} }
// replacing one segment in the pipeline with a new one exactly at the same index
// need to be called only within synchronized block
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
pipeline.set(idx, newSegment);
readOnlyCopy = new LinkedList<>(pipeline);
}
public Segment getTail() { public Segment getTail() {
List<? extends Segment> localCopy = getSegments(); List<? extends Segment> localCopy = getSegments();
if(localCopy.isEmpty()) { if(localCopy.isEmpty()) {

View File

@ -184,6 +184,16 @@ public class CompositeImmutableSegment extends ImmutableSegment {
throw new IllegalStateException("Not supported by CompositeImmutableScanner"); throw new IllegalStateException("Not supported by CompositeImmutableScanner");
} }
@Override
protected long indexEntrySize() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
@Override protected boolean canBeFlattened() {
return false;
}
/** /**
* @return Sum of all cell sizes. * @return Sum of all cell sizes.
*/ */

View File

@ -45,6 +45,15 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
throw new IllegalStateException("This is an Immutable MemStoreLAB."); throw new IllegalStateException("This is an Immutable MemStoreLAB.");
} }
@Override
// returning a new chunk, without replacing current chunk,
// the space on this chunk will be allocated externally
// use the first MemStoreLABImpl in the list
public Chunk getNewExternalChunk() {
MemStoreLAB mslab = this.mslabs.get(0);
return mslab.getNewExternalChunk();
}
@Override @Override
public void close() { public void close() {
// 'openScannerCount' here tracks the scanners opened on segments which directly refer to this // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this

View File

@ -19,14 +19,11 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -36,21 +33,11 @@ import java.util.List;
* and is not needed for a {@link MutableSegment}. * and is not needed for a {@link MutableSegment}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ImmutableSegment extends Segment { public abstract class ImmutableSegment extends Segment {
private static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+ (2 * ClassSize.REFERENCE) // Refs to timeRange and type + ClassSize.align(ClassSize.REFERENCE // Referent to timeRange
+ ClassSize.TIMERANGE; + ClassSize.TIMERANGE);
public static final long DEEP_OVERHEAD_CSLM = DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
/**
* Types of ImmutableSegment
*/
public enum Type {
SKIPLIST_MAP_BASED,
ARRAY_MAP_BASED,
}
/** /**
* This is an immutable segment so use the read-only TimeRange rather than the heavy-weight * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
@ -58,12 +45,8 @@ public class ImmutableSegment extends Segment {
*/ */
private final TimeRange timeRange; private final TimeRange timeRange;
private Type type = Type.SKIPLIST_MAP_BASED; // each sub-type of immutable segment knows whether it is flat or not
protected abstract boolean canBeFlattened();
// whether it is based on CellFlatMap or ConcurrentSkipListMap
private boolean isFlat(){
return (type != Type.SKIPLIST_MAP_BASED);
}
///////////////////// CONSTRUCTORS ///////////////////// ///////////////////// CONSTRUCTORS /////////////////////
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
@ -75,59 +58,25 @@ public class ImmutableSegment extends Segment {
} }
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
* Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one. * C-tor to be used to build the derived classes
*/
protected ImmutableSegment(CellSet cs, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cs, comparator, memStoreLAB);
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
/**------------------------------------------------------------------------
* Copy C-tor to be used when new CSLMImmutableSegment (derived) is being built from a Mutable one.
* This C-tor should be used when active MutableSegment is pushed into the compaction * This C-tor should be used when active MutableSegment is pushed into the compaction
* pipeline and becomes an ImmutableSegment. * pipeline and becomes an ImmutableSegment.
*/ */
protected ImmutableSegment(Segment segment) { protected ImmutableSegment(Segment segment) {
super(segment); super(segment);
this.type = Type.SKIPLIST_MAP_BASED;
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
} }
/**------------------------------------------------------------------------
* C-tor to be used when new CELL_ARRAY BASED ImmutableSegment is a result of compaction of a
* list of older ImmutableSegments.
* The given iterator returns the Cells that "survived" the compaction.
* The input parameter "type" exists for future use when more types of flat ImmutableSegments
* are going to be introduced.
*/
protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) {
super(null, // initiailize the CellSet with NULL
comparator, memStoreLAB);
this.type = type;
// build the new CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
this.setCellSet(null, cs); // update the CellSet of the new Segment
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
/**------------------------------------------------------------------------
* C-tor to be used when new SKIP-LIST BASED ImmutableSegment is a result of compaction of a
* list of older ImmutableSegments.
* The given iterator returns the Cells that "survived" the compaction.
*/
protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB) {
super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
comparator, memStoreLAB);
type = Type.SKIPLIST_MAP_BASED;
while (iterator.hasNext()) {
Cell c = iterator.next();
// The scanner is doing all the elimination logic
// now we just copy it to the new segment
Cell newKV = maybeCloneWithAllocator(c);
boolean usedMSLAB = (newKV != c);
internalAdd(newKV, usedMSLAB, null);
}
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
///////////////////// PUBLIC METHODS ///////////////////// ///////////////////// PUBLIC METHODS /////////////////////
@Override @Override
public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) { public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
return this.timeRange.includesTimeRange(tr) && return this.timeRange.includesTimeRange(tr) &&
@ -147,107 +96,4 @@ public class ImmutableSegment extends Segment {
List<Segment> res = new ArrayList<>(Arrays.asList(this)); List<Segment> res = new ArrayList<>(Arrays.asList(this));
return res; return res;
} }
/**------------------------------------------------------------------------
* Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
* based on CellArrayMap.
* If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP
*
* Synchronization of the CellSet replacement:
* The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment
* is constructed (single thread) or flattened. The flattening happens as part of a single
* thread of compaction, but to be on the safe side the initial CellSet is locally saved
* before the flattening and then replaced using CAS instruction.
*/
public boolean flatten(MemstoreSize memstoreSize) {
if (isFlat()) return false;
CellSet oldCellSet = getCellSet();
int numOfCells = getCellsCount();
// build the new (CellSet CellArrayMap based)
CellSet newCellSet = recreateCellArrayMapSet(numOfCells);
type = Type.ARRAY_MAP_BASED;
setCellSet(oldCellSet,newCellSet);
// arrange the meta-data size, decrease all meta-data sizes related to SkipList
// (recreateCellArrayMapSet doesn't take the care for the sizes)
long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
// add size of CellArrayMap and meta-data overhead per Cell
newSegmentSizeDelta = newSegmentSizeDelta + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
incSize(0, newSegmentSizeDelta);
if (memstoreSize != null) {
memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
}
return true;
}
///////////////////// PRIVATE METHODS /////////////////////
/*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from compacting iterator
private CellSet createCellArrayMapSet(int numOfCells, MemStoreSegmentsIterator iterator,
boolean merge) {
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
int i = 0;
while (iterator.hasNext()) {
Cell c = iterator.next();
// The scanner behind the iterator is doing all the elimination logic
if (merge) {
// if this is merge we just move the Cell object without copying MSLAB
// the sizes still need to be updated in the new segment
cells[i] = c;
} else {
// now we just copy it to the new segment (also MSLAB copy)
cells[i] = maybeCloneWithAllocator(c);
}
boolean useMSLAB = (getMemStoreLAB()!=null);
// second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
i++;
}
// build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
return new CellSet(cam);
}
@Override
protected long heapSizeChange(Cell cell, boolean succ) {
if (succ) {
switch (this.type) {
case SKIPLIST_MAP_BASED:
return super.heapSizeChange(cell, succ);
case ARRAY_MAP_BASED:
return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
}
}
return 0;
}
/*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet
// (without compacting iterator)
private CellSet recreateCellArrayMapSet(int numOfCells) {
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
Cell curCell;
int idx = 0;
// create this segment scanner with maximal possible read point, to go over all Cells
KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
try {
while ((curCell = segmentScanner.next()) != null) {
cells[idx++] = curCell;
}
} catch (IOException ie) {
throw new IllegalStateException(ie);
} finally {
segmentScanner.close();
}
// build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
return new CellSet(cam);
}
} }

View File

@ -80,7 +80,7 @@ public class MemStoreCompactor {
* Note that every value covers the previous ones, i.e. if MERGE is the action it implies * Note that every value covers the previous ones, i.e. if MERGE is the action it implies
* that the youngest segment is going to be flatten anyway. * that the youngest segment is going to be flatten anyway.
*/ */
private enum Action { public enum Action {
NOOP, NOOP,
FLATTEN, // flatten the youngest segment in the pipeline FLATTEN, // flatten the youngest segment in the pipeline
MERGE, // merge all the segments in the pipeline into one MERGE, // merge all the segments in the pipeline into one
@ -160,7 +160,7 @@ public class MemStoreCompactor {
if (action == Action.COMPACT) { // compact according to the user request if (action == Action.COMPACT) { // compact according to the user request
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be compacted, number of" + " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of"
+ " cells before compaction is " + versionedList.getNumOfCells()); + " cells before compaction is " + versionedList.getNumOfCells());
return Action.COMPACT; return Action.COMPACT;
} }
@ -170,13 +170,15 @@ public class MemStoreCompactor {
int numOfSegments = versionedList.getNumOfSegments(); int numOfSegments = versionedList.getNumOfSegments();
if (numOfSegments > pipelineThreshold) { if (numOfSegments > pipelineThreshold) {
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be merged, as there are " + numOfSegments + " segments"); + " is going to be merged to the " + compactingMemStore.getIndexType()
+ ", as there are " + numOfSegments + " segments");
return Action.MERGE; // to avoid too many segments, merge now return Action.MERGE; // to avoid too many segments, merge now
} }
// if nothing of the above, then just flatten the newly joined segment // if nothing of the above, then just flatten the newly joined segment
LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store " LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
+ compactingMemStore.getFamilyName() + " is going to be flattened"); + compactingMemStore.getFamilyName() + " is going to be flattened to the "
+ compactingMemStore.getIndexType());
return Action.FLATTEN; return Action.FLATTEN;
} }
@ -250,21 +252,22 @@ public class MemStoreCompactor {
result = SegmentFactory.instance().createImmutableSegmentByCompaction( result = SegmentFactory.instance().createImmutableSegmentByCompaction(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED); versionedList.getNumOfCells(), compactingMemStore.getIndexType());
iterator.close(); iterator.close();
break; break;
case MERGE: case MERGE:
iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), iterator =
compactingMemStore.getComparator(), compactionKVMax); new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
compactingMemStore.getComparator(),
compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge( result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED, versionedList.getNumOfCells(), versionedList.getStoreSegments(),
versionedList.getStoreSegments()); compactingMemStore.getIndexType());
iterator.close(); iterator.close();
break; break;
default: default: throw new RuntimeException("Unknown action " + action); // sanity check
throw new RuntimeException("Unknown action " + action); // sanity check
} }
return result; return result;

View File

@ -83,6 +83,12 @@ public interface MemStoreLAB {
*/ */
void decScannerCount(); void decScannerCount();
/**
* Return a new empty chunk without considering this chunk as current
* The space on this chunk will be allocated externally
*/
Chunk getNewExternalChunk();
public static MemStoreLAB newInstance(Configuration conf) { public static MemStoreLAB newInstance(Configuration conf) {
MemStoreLAB memStoreLAB = null; MemStoreLAB memStoreLAB = null;
if (isEnabled(conf)) { if (isEnabled(conf)) {

View File

@ -253,6 +253,17 @@ public class MemStoreLABImpl implements MemStoreLAB {
return null; return null;
} }
// Returning a new chunk, without replacing current chunk,
// meaning MSLABImpl does not make the returned chunk as CurChunk.
// The space on this chunk will be allocated externally
// The interface is only for external callers
@Override
public Chunk getNewExternalChunk() {
Chunk c = this.chunkCreator.getChunk();
chunks.add(c.getId());
return c;
}
@VisibleForTesting @VisibleForTesting
Chunk getCurrentChunk() { Chunk getCurrentChunk() {
return this.curChunk.get(); return this.curChunk.get();

View File

@ -42,6 +42,7 @@ public class MutableSegment extends Segment {
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cellSet, comparator, memStoreLAB); super(cellSet, comparator, memStoreLAB);
incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata
} }
/** /**
@ -121,4 +122,8 @@ public class MutableSegment extends Segment {
public long getMinTimestamp() { public long getMinTimestamp() {
return this.timeRangeTracker.getMin(); return this.timeRangeTracker.getMin();
} }
@Override protected long indexEntrySize() {
return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY;
}
} }

View File

@ -305,6 +305,10 @@ public abstract class Segment {
} }
} }
protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemstoreSize memstoreSize) {
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSize);
}
/** /**
* @return The increase in heap size because of this cell addition. This includes this cell POJO's * @return The increase in heap size because of this cell addition. This includes this cell POJO's
* heap size itself and additional overhead because of addition on to CSLM. * heap size itself and additional overhead because of addition on to CSLM.
@ -312,11 +316,13 @@ public abstract class Segment {
protected long heapSizeChange(Cell cell, boolean succ) { protected long heapSizeChange(Cell cell, boolean succ) {
if (succ) { if (succ) {
return ClassSize return ClassSize
.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell)); .align(indexEntrySize() + CellUtil.estimatedHeapSizeOf(cell));
} }
return 0; return 0;
} }
protected abstract long indexEntrySize();
/** /**
* Returns a subset of the segment cell set, which starts with the given cell * Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment * @param firstCell a cell in the segment

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -41,42 +40,36 @@ public final class SegmentFactory {
return instance; return instance;
} }
// create skip-list-based (non-flat) immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator) {
return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
}
// create composite immutable segment from a list of segments // create composite immutable segment from a list of segments
// for snapshot consisting of multiple segments
public CompositeImmutableSegment createCompositeImmutableSegment( public CompositeImmutableSegment createCompositeImmutableSegment(
final CellComparator comparator, List<ImmutableSegment> segments) { final CellComparator comparator, List<ImmutableSegment> segments) {
return new CompositeImmutableSegment(comparator, segments); return new CompositeImmutableSegment(comparator, segments);
} }
// create new flat immutable segment from compacting old immutable segments // create new flat immutable segment from compacting old immutable segments
// for compaction
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
ImmutableSegment.Type segmentType) CompactingMemStore.IndexType idxType)
throws IOException { throws IOException {
Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
"wrong immutable segment type");
MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf); MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return return
// the last parameter "false" means not to merge, but to compact the pipeline createImmutableSegment(
// in order to create the new segment conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType);
new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false);
} }
// create empty immutable segment // create empty immutable segment
// for initializations
public ImmutableSegment createImmutableSegment(CellComparator comparator) { public ImmutableSegment createImmutableSegment(CellComparator comparator) {
MutableSegment segment = generateMutableSegment(null, comparator, null); MutableSegment segment = generateMutableSegment(null, comparator, null);
return createImmutableSegment(segment); return createImmutableSegment(segment);
} }
// create immutable segment from mutable segment // create not-flat immutable segment from mutable segment
public ImmutableSegment createImmutableSegment(MutableSegment segment) { public ImmutableSegment createImmutableSegment(MutableSegment segment) {
return new ImmutableSegment(segment); return new CSLMImmutableSegment(segment);
} }
// create mutable segment // create mutable segment
@ -86,19 +79,58 @@ public final class SegmentFactory {
} }
// create new flat immutable segment from merging old immutable segments // create new flat immutable segment from merging old immutable segments
// for merge
public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf, public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
ImmutableSegment.Type segmentType, List<ImmutableSegment> segments) List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType)
throws IOException { throws IOException {
Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
"wrong immutable segment type");
MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments); MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
return return
// the last parameter "true" means to merge the compaction pipeline createImmutableSegment(
// in order to create the new segment conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType);
new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true);
} }
// create flat immutable segment from non-flat immutable segment
// for flattening
public ImmutableSegment createImmutableSegmentByFlattening(
CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemstoreSize memstoreSize) {
ImmutableSegment res = null;
switch (idxType) {
case CHUNK_MAP:
res = new CellChunkImmutableSegment(segment, memstoreSize);
break;
case CSLM_MAP:
assert false; // non-flat segment can not be the result of flattening
break;
case ARRAY_MAP:
res = new CellArrayImmutableSegment(segment, memstoreSize);
break;
}
return res;
}
//****** private methods to instantiate concrete store segments **********// //****** private methods to instantiate concrete store segments **********//
private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells,
MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) {
ImmutableSegment res = null;
switch (idxType) {
case CHUNK_MAP:
res = new CellChunkImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
break;
case CSLM_MAP:
assert false; // non-flat segment can not be created here
break;
case ARRAY_MAP:
res = new CellArrayImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
break;
}
return res;
}
private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator, private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
MemStoreLAB memStoreLAB) { MemStoreLAB memStoreLAB) {

View File

@ -369,6 +369,7 @@ public class TestHeapSize {
if (expected != actual) { if (expected != actual) {
ClassSize.estimateBase(cl, true); ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true);
@ -376,9 +377,28 @@ public class TestHeapSize {
assertEquals(expected, actual); assertEquals(expected, actual);
} }
// ImmutableSegment Deep overhead // ImmutableSegments Deep overhead
cl = ImmutableSegment.class; cl = ImmutableSegment.class;
actual = ImmutableSegment.DEEP_OVERHEAD_CSLM; actual = ImmutableSegment.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
assertEquals(expected, actual);
}
cl = CSLMImmutableSegment.class;
actual = CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
expected = ClassSize.estimateBase(cl, false); expected = ClassSize.estimateBase(cl, false);
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false);
@ -389,6 +409,7 @@ public class TestHeapSize {
if (expected != actual) { if (expected != actual) {
ClassSize.estimateBase(cl, true); ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true);
@ -396,7 +417,8 @@ public class TestHeapSize {
ClassSize.estimateBase(ConcurrentSkipListMap.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual); assertEquals(expected, actual);
} }
actual = ImmutableSegment.DEEP_OVERHEAD_CAM; cl = CellArrayImmutableSegment.class;
actual = CellArrayImmutableSegment.DEEP_OVERHEAD_CAM;
expected = ClassSize.estimateBase(cl, false); expected = ClassSize.estimateBase(cl, false);
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false); expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false); expected += ClassSize.estimateBase(AtomicReference.class, false);
@ -407,6 +429,7 @@ public class TestHeapSize {
if (expected != actual) { if (expected != actual) {
ClassSize.estimateBase(cl, true); ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true); ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true);

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -286,8 +287,8 @@ public class TestCellFlatSet extends TestCase {
ByteBuffer idxBuffer = idxChunk.getData(); // the buffers of the chunks ByteBuffer idxBuffer = idxChunk.getData(); // the buffers of the chunks
ByteBuffer dataBuffer = dataChunk.getData(); ByteBuffer dataBuffer = dataChunk.getData();
int dataOffset = Bytes.SIZEOF_INT; // offset inside data buffer int dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // offset inside data buffer
int idxOffset = Bytes.SIZEOF_INT; // skip the space for chunk ID int idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // skip the space for chunk ID
Cell[] cellArray = asc ? ascCells : descCells; Cell[] cellArray = asc ? ascCells : descCells;
@ -296,16 +297,16 @@ public class TestCellFlatSet extends TestCase {
if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) { if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) {
dataChunk = chunkCreator.getChunk(); // allocate more data chunks if needed dataChunk = chunkCreator.getChunk(); // allocate more data chunks if needed
dataBuffer = dataChunk.getData(); dataBuffer = dataChunk.getData();
dataOffset = Bytes.SIZEOF_INT; dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
} }
int dataStartOfset = dataOffset; int dataStartOfset = dataOffset;
dataOffset = KeyValueUtil.appendTo(kv, dataBuffer, dataOffset, false); // write deep cell data dataOffset = KeyValueUtil.appendTo(kv, dataBuffer, dataOffset, false); // write deep cell data
// do we have enough space to write the cell-representation on the index chunk? // do we have enough space to write the cell-representation on the index chunk?
if (idxOffset + CellChunkMap.SIZEOF_CELL_REP > chunkCreator.getChunkSize()) { if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) {
idxChunk = chunkCreator.getChunk(); // allocate more index chunks if needed idxChunk = chunkCreator.getChunk(); // allocate more index chunks if needed
idxBuffer = idxChunk.getData(); idxBuffer = idxChunk.getData();
idxOffset = Bytes.SIZEOF_INT; idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
chunkArray[chunkArrayIdx++] = idxChunk; chunkArray[chunkArrayIdx++] = idxChunk;
} }
idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataChunk.getId()); // write data chunk id idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataChunk.getId()); // write data chunk id
@ -314,8 +315,6 @@ public class TestCellFlatSet extends TestCase {
idxOffset = ByteBufferUtils.putLong(idxBuffer, idxOffset, kv.getSequenceId()); // seqId idxOffset = ByteBufferUtils.putLong(idxBuffer, idxOffset, kv.getSequenceId()); // seqId
} }
return asc ? return new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,!asc);
new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,false) :
new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,true);
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -98,7 +99,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
assertTrue(chunkCreator != null); assertTrue(chunkCreator != null);
} }
@ -563,16 +564,71 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertTrue(chunkCreator.getPoolSize() > 0); assertTrue(chunkCreator.getPoolSize() > 0);
} }
@Test
public void testFlatteningToCellChunkMap() throws IOException {
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 8;
String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(keys1[0] + 0);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
int totalCellsLen = addRowsByKeys(memstore, keys1);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
.length(kv));
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Compaction tests // Compaction tests
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@Test @Test
public void testCompaction1Bucket() throws IOException { public void testCompaction1Bucket() throws IOException {
// set memstore to do data compaction and not to use the speculative scan // set memstore to do basic structure flattening, the "eager" option is tested in
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; // TestCompactingToCellFlatMapMemStore
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
String.valueOf(compactionType)); memstore.getConfiguration()
.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); ((CompactingMemStore)memstore).initiateType(compactionType);
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
@ -581,25 +637,24 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
int totalCellsLen = addRowsByKeys(memstore, keys1); int totalCellsLen = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120; int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88; int oneCellOnCAHeapSize = 88;
long totalHeapSize = 4 * oneCellOnCSLMHeapSize; long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// There is no compaction, as the compacting memstore type is basic.
// totalCellsLen remains the same
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 4 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize(); MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen = (totalCellsLen * 3) / 4;
totalHeapSize = 3 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(3, s.getCellsCount()); assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize()); assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId()); memstore.clearSnapshot(snapshot.getId());
@ -608,8 +663,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Test @Test
public void testCompaction2Buckets() throws IOException { public void testCompaction2Buckets() throws IOException {
// set memstore to do data compaction and not to use the speculative scan // set memstore to do basic structure flattening, the "eager" option is tested in
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; // TestCompactingToCellFlatMapMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType)); String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); ((CompactingMemStore)memstore).initiateType(compactionType);
@ -619,24 +675,23 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
int totalCellsLen1 = addRowsByKeys(memstore, keys1); int totalCellsLen1 = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120; int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88; int oneCellOnCAHeapSize = 88;
long totalHeapSize = 4 * oneCellOnCSLMHeapSize; long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
int counter = 0; int counter = 0;
for ( Segment s : memstore.getSegments()) { for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount(); counter += s.getCellsCount();
} }
assertEquals(3, counter); assertEquals(4, counter);
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same time so adjusting // There is no compaction, as the compacting memstore type is basic.
// totalCellsLen // totalCellsLen remains the same
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
totalHeapSize = 3 * oneCellOnCAHeapSize; totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 4 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
int totalCellsLen2 = addRowsByKeys(memstore, keys2); int totalCellsLen2 = addRowsByKeys(memstore, keys2);
@ -644,19 +699,19 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize(); MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
totalHeapSize = 4 * oneCellOnCAHeapSize; totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 7 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
size = memstore.getFlushableSize(); size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot(); ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount()); assertEquals(7, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize()); assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId()); memstore.clearSnapshot(snapshot.getId());
@ -678,10 +733,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
int oneCellOnCSLMHeapSize = 120; int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88; int oneCellOnCAHeapSize = 88;
assertEquals(totalCellsLen1, region.getMemstoreSize()); assertEquals(totalCellsLen1, region.getMemstoreSize());
long totalHeapSize = 4 * oneCellOnCSLMHeapSize; long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
@ -690,29 +743,31 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
totalCellsLen1 = (totalCellsLen1 * 3) / 4; totalCellsLen1 = (totalCellsLen1 * 3) / 4;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
// In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
totalHeapSize = 3 * oneCellOnCAHeapSize; totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+ 3 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize; long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).disableCompaction(); ((CompactingMemStore) memstore).disableCompaction();
size = memstore.getFlushableSize(); MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
// No change in the cells data size. ie. memstore size. as there is no compaction. // No change in the cells data size. ie. memstore size. as there is no compaction.
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
((CompactingMemStore) memstore).heapSize());
int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize()); regionServicesForStores.getMemstoreSize());
long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize; long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
assertEquals(totalHeapSize + totalHeapSize2 + totalHeapSize3, + 3 * oneCellOnCSLMHeapSize;
((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore)memstore).enableCompaction(); ((CompactingMemStore)memstore).enableCompaction();
size = memstore.getFlushableSize(); size = memstore.getFlushableSize();
@ -725,7 +780,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize()); regionServicesForStores.getMemstoreSize());
// Only 4 unique cells left // Only 4 unique cells left
assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize()); assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
+ CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize(); size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot

View File

@ -1,492 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.List;
/**
* compacted memstore test case
*/
@Category({RegionServerTests.class, MediumTests.class})
public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
@Override public void tearDown() throws Exception {
chunkCreator.clearChunksInPool();
}
@Override public void setUp() throws Exception {
compactingSetUp();
Configuration conf = HBaseConfiguration.create();
// set memstore to do data compaction
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.EAGER));
this.memstore =
new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
regionServicesForStores, MemoryCompactionPolicy.EAGER);
}
//////////////////////////////////////////////////////////////////////////////
// Compaction tests
//////////////////////////////////////////////////////////////////////////////
public void testCompaction1Bucket() throws IOException {
int counter = 0;
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
// test 1 bucket
long totalCellsLen = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
assertEquals(4, memstore.getActive().getCellsCount());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen = (totalCellsLen * 3) / 4;
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
totalHeapSize = 3 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(3, counter);
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(3, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
public void testCompaction2Buckets() throws IOException {
String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" };
long totalCellsLen1 = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
int counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(3,counter);
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
totalHeapSize1 = 3 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen2 = addRowsByKeys(memstore, keys2);
long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(4,counter);
totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
totalHeapSize2 = 1 * oneCellOnCAHeapSize;
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
public void testCompaction3Buckets() throws IOException {
String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" };
String[] keys3 = { "D", "B", "B" };
long totalCellsLen1 = addRowsByKeys(memstore, keys1);
int oneCellOnCSLMHeapSize = 120;
int oneCellOnCAHeapSize = 88;
long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1, region.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
MemstoreSize size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
// totalCellsLen
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
totalHeapSize1 = 3 * oneCellOnCAHeapSize;
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen2 = addRowsByKeys(memstore, keys2);
long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).disableCompaction();
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen3 = addRowsByKeys(memstore, keys3);
long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize;
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).enableCompaction();
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
// active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
// Out of total 10, only 4 cells are unique
totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
totalCellsLen3 = 0;// All duplicated cells.
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemstoreSize());
// Only 4 unique cells left
assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize());
size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemstoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(4, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemstoreSize());
memstore.clearSnapshot(snapshot.getId());
}
//////////////////////////////////////////////////////////////////////////////
// Merging tests
//////////////////////////////////////////////////////////////////////////////
@Test
public void testMerging() throws IOException {
String[] keys1 = { "A", "A", "B", "C", "F", "H"};
String[] keys2 = { "A", "B", "D", "G", "I", "J"};
String[] keys3 = { "D", "B", "B", "E" };
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
addRowsByKeys(memstore, keys1);
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
addRowsByKeys(memstore, keys2); // also should only flatten
int counter2 = 0;
for ( Segment s : memstore.getSegments()) {
counter2 += s.getCellsCount();
}
assertEquals(12, counter2);
((CompactingMemStore) memstore).disableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
assertEquals(0, memstore.getSnapshot().getCellsCount());
int counter3 = 0;
for ( Segment s : memstore.getSegments()) {
counter3 += s.getCellsCount();
}
assertEquals(12, counter3);
addRowsByKeys(memstore, keys3);
int counter4 = 0;
for ( Segment s : memstore.getSegments()) {
counter4 += s.getCellsCount();
}
assertEquals(16, counter4);
((CompactingMemStore) memstore).enableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
int counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
}
assertEquals(16,counter);
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
ImmutableSegment s = memstore.getSnapshot();
memstore.clearSnapshot(snapshot.getId());
}
@Test
public void testCountOfCellsAfterFlatteningByScan() throws IOException {
String[] keys1 = { "A", "B", "C" }; // A, B, C
addRowsByKeysWith50Cols(memstore, keys1);
// this should only flatten as there are no duplicates
((CompactingMemStore) memstore).flushInMemory();
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
// seek
int count = 0;
for(int i = 0; i < scanners.size(); i++) {
scanners.get(i).seek(KeyValue.LOWESTKEY);
while (scanners.get(i).next() != null) {
count++;
}
}
assertEquals("the count should be ", count, 150);
for(int i = 0; i < scanners.size(); i++) {
scanners.get(i).close();
}
}
@Test
public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
String[] keys1 = { "A", "B", "C" }; // A, B, C
addRowsByKeysWith50Cols(memstore, keys1);
// this should only flatten as there are no duplicates
((CompactingMemStore) memstore).flushInMemory();
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
// Just doing the cnt operation here
MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
CellComparator.COMPARATOR, 10);
int cnt = 0;
try {
while (itr.next() != null) {
cnt++;
}
} finally {
itr.close();
}
assertEquals("the count should be ", cnt, 150);
}
private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]);
for(int j =0 ;j < 50; j++) {
byte[] qf = Bytes.toBytes("testqualifier"+j);
byte[] val = Bytes.toBytes(keys[i] + j);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
hmc.add(kv, null);
}
}
}
@Override
@Test
public void testPuttingBackChunksWithOpeningScanner() throws IOException {
byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1");
byte[] qf2 = Bytes.toBytes("testqualifier2");
byte[] qf3 = Bytes.toBytes("testqualifier3");
byte[] qf4 = Bytes.toBytes("testqualifier4");
byte[] qf5 = Bytes.toBytes("testqualifier5");
byte[] qf6 = Bytes.toBytes("testqualifier6");
byte[] qf7 = Bytes.toBytes("testqualifier7");
byte[] val = Bytes.toBytes("testval");
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val), null);
memstore.add(new KeyValue(row, fam, qf2, val), null);
memstore.add(new KeyValue(row, fam, qf3, val), null);
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore
assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// opening scanner before clear the snapshot
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
chunkCreator.clearChunksInPool();
// Creating another snapshot
snapshot = memstore.snapshot();
// Adding more value
memstore.add(new KeyValue(row, fam, qf6, val), null);
memstore.add(new KeyValue(row, fam, qf7, val), null);
// opening scanners
scanners = memstore.getScanners(0);
// close scanners before clear the snapshot
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
public void testPuttingBackChunksAfterFlushing() throws IOException {
byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1");
byte[] qf2 = Bytes.toBytes("testqualifier2");
byte[] qf3 = Bytes.toBytes("testqualifier3");
byte[] qf4 = Bytes.toBytes("testqualifier4");
byte[] qf5 = Bytes.toBytes("testqualifier5");
byte[] val = Bytes.toBytes("testval");
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val), null);
memstore.add(new KeyValue(row, fam, qf2, val), null);
memstore.add(new KeyValue(row, fam, qf3, val), null);
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore
assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
MemstoreSize memstoreSize = new MemstoreSize();
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]);
byte[] val = Bytes.toBytes(keys[i] + i);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
hmc.add(kv, memstoreSize);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
}
regionServicesForStores.addMemstoreSize(memstoreSize);
return memstoreSize.getDataSize();
}
}

View File

@ -954,9 +954,12 @@ public class TestDefaultMemStore {
conf, FSTableDescriptors.createMetaTableDescriptor(conf), conf, FSTableDescriptors.createMetaTableDescriptor(conf),
wFactory.getMetaWAL(HRegionInfo.FIRST_META_REGIONINFO. wFactory.getMetaWAL(HRegionInfo.FIRST_META_REGIONINFO.
getEncodedNameAsBytes())); getEncodedNameAsBytes()));
HRegionInfo hri = new HRegionInfo(TableName.valueOf(name.getMethodName()), // parameterized tests add [#] suffix get rid of [ and ].
HRegionInfo hri =
new HRegionInfo(TableName.valueOf(name.getMethodName().replaceAll("[\\[\\]]", "_")),
Bytes.toBytes("row_0200"), Bytes.toBytes("row_0300")); Bytes.toBytes("row_0200"), Bytes.toBytes("row_0300"));
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(
name.getMethodName().replaceAll("[\\[\\]]", "_")));
desc.addFamily(new HColumnDescriptor("foo".getBytes())); desc.addFamily(new HColumnDescriptor("foo".getBytes()));
HRegion r = HRegion r =
HRegion.createHRegion(hri, testDir, conf, desc, HRegion.createHRegion(hri, testDir, conf, desc,

View File

@ -788,7 +788,8 @@ public class TestHRegionReplayEvents {
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize().getHeapSize(); long newFlushableSize = store.getFlushableSize().getHeapSize();
if (droppableMemstore) { if (droppableMemstore) {
assertTrue(newFlushableSize == 0); // assert that the memstore is dropped // assert that the memstore is dropped
assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD);
} else { } else {
assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
} }
@ -876,9 +877,9 @@ public class TestHRegionReplayEvents {
for (HStore s : secondaryRegion.getStores()) { for (HStore s : secondaryRegion.getStores()) {
assertEquals(expectedStoreFileCount, s.getStorefilesCount()); assertEquals(expectedStoreFileCount, s.getStorefilesCount());
} }
HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
long newFlushableSize = store.getFlushableSize().getHeapSize(); long newFlushableSize = store.getFlushableSize().getHeapSize();
assertTrue(newFlushableSize == 0); assertTrue(newFlushableSize == MutableSegment.DEEP_OVERHEAD);
// assert that the region memstore is empty // assert that the region memstore is empty
long newRegionMemstoreSize = secondaryRegion.getMemstoreSize(); long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();

View File

@ -254,6 +254,8 @@ public class TestHStore {
LOG.info("Adding some data"); LOG.info("Adding some data");
MemstoreSize kvSize = new MemstoreSize(); MemstoreSize kvSize = new MemstoreSize();
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
// add the heap size of active (mutable) segment
kvSize.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD);
size = store.memstore.getFlushableSize(); size = store.memstore.getFlushableSize();
assertEquals(kvSize, size); assertEquals(kvSize, size);
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
@ -264,10 +266,14 @@ public class TestHStore {
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Fault injected")); assertTrue(ioe.getMessage().contains("Fault injected"));
} }
// due to snapshot, change mutable to immutable segment
kvSize.incMemstoreSize(0,
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD);
size = store.memstore.getFlushableSize(); size = store.memstore.getFlushableSize();
assertEquals(kvSize, size); assertEquals(kvSize, size);
MemstoreSize kvSize2 = new MemstoreSize(); MemstoreSize kvSize2 = new MemstoreSize();
store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2); store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
kvSize2.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD);
// Even though we add a new kv, we expect the flushable size to be 'same' since we have // Even though we add a new kv, we expect the flushable size to be 'same' since we have
// not yet cleared the snapshot -- the above flush failed. // not yet cleared the snapshot -- the above flush failed.
assertEquals(kvSize, size); assertEquals(kvSize, size);
@ -279,7 +285,7 @@ public class TestHStore {
flushStore(store, id++); flushStore(store, id++);
size = store.memstore.getFlushableSize(); size = store.memstore.getFlushableSize();
assertEquals(0, size.getDataSize()); assertEquals(0, size.getDataSize());
assertEquals(0, size.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, size.getHeapSize());
return null; return null;
} }
}); });

View File

@ -194,7 +194,7 @@ public class TestPerColumnFamilyFlush {
// We should have cleared out only CF1, since we chose the flush thresholds // We should have cleared out only CF1, since we chose the flush thresholds
// and number of puts accordingly. // and number of puts accordingly.
assertEquals(0, cf1MemstoreSize.getDataSize()); assertEquals(0, cf1MemstoreSize.getDataSize());
assertEquals(0, cf1MemstoreSize.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
// Nothing should have happened to CF2, ... // Nothing should have happened to CF2, ...
assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
// ... or CF3 // ... or CF3
@ -231,9 +231,9 @@ public class TestPerColumnFamilyFlush {
// CF1 and CF2, both should be absent. // CF1 and CF2, both should be absent.
assertEquals(0, cf1MemstoreSize.getDataSize()); assertEquals(0, cf1MemstoreSize.getDataSize());
assertEquals(0, cf1MemstoreSize.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
assertEquals(0, cf2MemstoreSize.getDataSize()); assertEquals(0, cf2MemstoreSize.getDataSize());
assertEquals(0, cf2MemstoreSize.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
// CF3 shouldn't have been touched. // CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize()); assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
@ -314,11 +314,11 @@ public class TestPerColumnFamilyFlush {
// Everything should have been cleared // Everything should have been cleared
assertEquals(0, cf1MemstoreSize.getDataSize()); assertEquals(0, cf1MemstoreSize.getDataSize());
assertEquals(0, cf1MemstoreSize.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
assertEquals(0, cf2MemstoreSize.getDataSize()); assertEquals(0, cf2MemstoreSize.getDataSize());
assertEquals(0, cf2MemstoreSize.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
assertEquals(0, cf3MemstoreSize.getDataSize()); assertEquals(0, cf3MemstoreSize.getDataSize());
assertEquals(0, cf3MemstoreSize.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
assertEquals(0, totalMemstoreSize); assertEquals(0, totalMemstoreSize);
assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
@ -525,9 +525,12 @@ public class TestPerColumnFamilyFlush {
}); });
LOG.info("Finished waiting on flush after too many WALs..."); LOG.info("Finished waiting on flush after too many WALs...");
// Individual families should have been flushed. // Individual families should have been flushed.
assertEquals(0, desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD,
assertEquals(0, desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize()); desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
assertEquals(0, desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
assertEquals(MutableSegment.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
// let WAL cleanOldLogs // let WAL cleanOldLogs
assertNull(getWAL(desiredRegion).rollWriter(true)); assertNull(getWAL(desiredRegion).rollWriter(true));
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);

View File

@ -241,7 +241,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should become empty // CF2 should become empty
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
// verify that CF3 was flushed to memory and was compacted (this is approximation check) // verify that CF3 was flushed to memory and was compacted (this is approximation check)
assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize()); assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
@ -302,7 +302,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should be flushed to disk // CF2 should be flushed to disk
assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
// CF3 shouldn't have been touched. // CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
@ -326,11 +326,11 @@ public class TestWalAndCompactingMemStoreFlush {
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf1MemstoreSizePhaseV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf3MemstoreSizePhaseV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
// What happens when we hit the memstore limit, but we are not able to find // What happens when we hit the memstore limit, but we are not able to find
// any Column Family above the threshold? // any Column Family above the threshold?
@ -476,7 +476,7 @@ public class TestWalAndCompactingMemStoreFlush {
assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
// CF2 should become empty // CF2 should become empty
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
// verify that CF3 was flushed to memory and was not compacted (this is an approximation check) // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
// if compacted CF# should be at least twice less because its every key was duplicated // if compacted CF# should be at least twice less because its every key was duplicated
assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize()); assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
@ -544,7 +544,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should remain empty // CF2 should remain empty
assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseIV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
// CF3 shouldn't have been touched. // CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
// the smallest LSN of CF3 shouldn't change // the smallest LSN of CF3 shouldn't change
@ -573,11 +573,11 @@ public class TestWalAndCompactingMemStoreFlush {
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* PHASE V - validation */ /* PHASE V - validation */
assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf1MemstoreSizePhaseV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
assertEquals(0, cf3MemstoreSizePhaseV.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
// The total memstores size should be empty // The total memstores size should be empty
assertEquals(0, totalMemstoreSizePhaseV); assertEquals(0, totalMemstoreSizePhaseV);
// Because there is nothing in any memstore the WAL's LSN should be -1 // Because there is nothing in any memstore the WAL's LSN should be -1
@ -699,7 +699,7 @@ public class TestWalAndCompactingMemStoreFlush {
// CF2 should have been cleared // CF2 should have been cleared
assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
assertEquals(0, cf2MemstoreSizePhaseII.getHeapSize()); assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
String s = "\n\n----------------------------------\n" String s = "\n\n----------------------------------\n"
+ "Upon initial insert and flush, LSN of CF1 is:" + "Upon initial insert and flush, LSN of CF1 is:"
@ -875,9 +875,13 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
// the decrease in the heap size due to usage of CellArrayMap instead of CSLM
// should be the same in flattening and in merge (first and second in-memory-flush)
// but in phase 1 we do not yet have immutable segment
assertEquals( assertEquals(
cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()); cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
- CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
assertEquals(3, // active, one in pipeline, snapshot assertEquals(3, // active, one in pipeline, snapshot
((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size()); ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
// CF2 should have been cleared // CF2 should have been cleared