HBASE-18232: Support Jumbo Chunks for CellChunkMap

This commit is contained in:
anastas 2017-10-31 14:42:52 +02:00
parent 5000652e5e
commit 2f29bbb373
8 changed files with 248 additions and 102 deletions

View File

@ -85,6 +85,10 @@ public abstract class Chunk {
return this.fromPool;
}
boolean isJumbo() {
return size > ChunkCreator.getInstance().getChunkSize();
}
/**
* Actually claim the memory for this chunk. This should only be called from the thread that
* constructed the chunk. It is thread-safe against other threads calling alloc(), who will block

View File

@ -110,17 +110,27 @@ public class ChunkCreator {
* @return the chunk that was initialized
*/
Chunk getChunk() {
return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
return getChunk(CompactingMemStore.IndexType.ARRAY_MAP, chunkSize);
}
/**
* Creates and inits a chunk. The default implementation for specific index type.
* @return the chunk that was initialized
*/
Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
return getChunk(chunkIndexType, chunkSize);
}
/**
* Creates and inits a chunk.
* @return the chunk that was initialized
* @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
* @param size the size of the chunk to be allocated, in bytes
*/
Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, int size) {
Chunk chunk = null;
if (pool != null) {
// if we have pool and this is not jumbo chunk (when size != chunkSize this is jumbo chunk)
if ((pool != null) && (size == chunkSize)) {
// the pool creates the chunk internally. The chunk#init() call happens here
chunk = this.pool.getChunk();
// the pool has run out of maxCount
@ -132,9 +142,9 @@ public class ChunkCreator {
}
}
if (chunk == null) {
// the second boolean parameter means:
// if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap
chunk = createChunk(false, chunkIndexType);
// the second parameter explains whether CellChunkMap index is requested,
// in that case, put allocated on demand chunk mapping into chunkIdMap
chunk = createChunk(false, chunkIndexType, size);
}
// now we need to actually do the expensive memory allocation step in case of a new chunk,
@ -143,21 +153,38 @@ public class ChunkCreator {
return chunk;
}
/**
* Creates and inits a chunk of a special size, bigger than a regular chunk size.
* Such a chunk will never come from pool and will always be on demand allocated.
* @return the chunk that was initialized
* @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
* @param jumboSize the special size to be used
*/
Chunk getJumboChunk(CompactingMemStore.IndexType chunkIndexType, int jumboSize) {
if (jumboSize <= chunkSize) {
LOG.warn("Jumbo chunk size " + jumboSize + " must be more than regular chunk size "
+ chunkSize + ". Converting to regular chunk.");
getChunk(chunkIndexType,chunkSize);
}
return getChunk(chunkIndexType, jumboSize);
}
/**
* Creates the chunk either onheap or offheap
* @param pool indicates if the chunks have to be created which will be used by the Pool
* @param chunkIndexType
* @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
* @param size the size of the chunk to be allocated, in bytes
* @return the chunk
*/
private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType) {
private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType, int size) {
Chunk chunk = null;
int id = chunkID.getAndIncrement();
assert id > 0;
// do not create offheap chunk on demand
if (pool && this.offheap) {
chunk = new OffheapChunk(chunkSize, id, pool);
chunk = new OffheapChunk(size, id, pool);
} else {
chunk = new OnheapChunk(chunkSize, id, pool);
chunk = new OnheapChunk(size, id, pool);
}
if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
// put the pool chunk into the chunkIdMap so it is not GC-ed
@ -166,6 +193,12 @@ public class ChunkCreator {
return chunk;
}
// Chunks from pool are created covered with strong references anyway
// TODO: change to CHUNK_MAP if it is generally defined
private Chunk createChunkForPool() {
return createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP, chunkSize);
}
@VisibleForTesting
// Used to translate the ChunkID into a chunk ref
Chunk getChunk(int id) {
@ -227,9 +260,7 @@ public class ChunkCreator {
this.poolSizePercentage = poolSizePercentage;
this.reclaimedChunks = new LinkedBlockingQueue<>();
for (int i = 0; i < initialCount; i++) {
// Chunks from pool are covered with strong references anyway
// TODO: change to CHUNK_MAP if it is generally defined
Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
Chunk chunk = createChunkForPool();
chunk.init();
reclaimedChunks.add(chunk);
}
@ -261,8 +292,7 @@ public class ChunkCreator {
long created = this.chunkCount.get();
if (created < this.maxCount) {
if (this.chunkCount.compareAndSet(created, created + 1)) {
// TODO: change to CHUNK_MAP if it is generally defined
chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
chunk = createChunkForPool();
break;
}
} else {
@ -433,8 +463,15 @@ public class ChunkCreator {
// this translation will (most likely) return null
Chunk chunk = ChunkCreator.this.getChunk(chunkID);
if (chunk != null) {
// Jumbo chunks are covered with chunkIdMap, but are not from pool, so such a chunk should
// be released here without going to pool.
// Removing them from chunkIdMap will cause their removal by the GC.
if (chunk.isJumbo()) {
this.removeChunk(chunkID);
} else {
pool.putbackChunks(chunk);
}
}
// if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also),
// so we have nothing to do on its release
}

View File

@ -45,15 +45,34 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
throw new IllegalStateException("This is an Immutable MemStoreLAB.");
}
/* Creating chunk to be used as index chunk in CellChunkMap, part of the chunks array.
** Returning a new chunk, without replacing current chunk,
** meaning MSLABImpl does not make the returned chunk as CurChunk.
** The space on this chunk will be allocated externally.
** The interface is only for external callers
*/
@Override
// returning a new chunk, without replacing current chunk,
// the space on this chunk will be allocated externally
// use the first MemStoreLABImpl in the list
public Chunk getNewExternalChunk() {
MemStoreLAB mslab = this.mslabs.get(0);
return mslab.getNewExternalChunk();
}
/* Creating chunk to be used as data chunk in CellChunkMap.
** This chunk is bigger the normal constant chunk size, and thus called JumboChunk it is used for
** jumbo cells (which size is bigger than normal chunks).
** Jumbo Chunks are needed only for CCM and thus are created only in
** CompactingMemStore.IndexType.CHUNK_MAP type.
** Returning a new chunk, without replacing current chunk,
** meaning MSLABImpl does not make the returned chunk as CurChunk.
** The space on this chunk will be allocated externally.
** The interface is only for external callers
*/
@Override
public Chunk getNewExternalJumboChunk(int size) {
MemStoreLAB mslab = this.mslabs.get(0);
return mslab.getNewExternalJumboChunk(size);
}
@Override
public void close() {
// 'openScannerCount' here tracks the scanners opened on segments which directly refer to this

View File

@ -83,12 +83,26 @@ public interface MemStoreLAB {
*/
void decScannerCount();
/**
* Return a new empty chunk without considering this chunk as current
* The space on this chunk will be allocated externally
/* Creating chunk to be used as index chunk in CellChunkMap, part of the chunks array.
** Returning a new chunk, without replacing current chunk,
** meaning MSLABImpl does not make the returned chunk as CurChunk.
** The space on this chunk will be allocated externally.
** The interface is only for external callers
*/
Chunk getNewExternalChunk();
/* Creating chunk to be used as data chunk in CellChunkMap.
** This chunk is bigger the normal constant chunk size, and thus called JumboChunk it is used for
** jumbo cells (which size is bigger than normal chunks).
** Jumbo Chunks are needed only for CCM and thus are created only in
** CompactingMemStore.IndexType.CHUNK_MAP type.
** Returning a new chunk, without replacing current chunk,
** meaning MSLABImpl does not make the returned chunk as CurChunk.
** The space on this chunk will be allocated externally.
** The interface is only for external callers
*/
Chunk getNewExternalJumboChunk(int size);
public static MemStoreLAB newInstance(Configuration conf) {
MemStoreLAB memStoreLAB = null;
if (isEnabled(conf)) {

View File

@ -271,6 +271,26 @@ public class MemStoreLABImpl implements MemStoreLAB {
return c;
}
/* Creating chunk to be used as data chunk in CellChunkMap.
** This chunk is bigger than normal constant chunk size, and thus called JumboChunk.
** JumboChunk is used for jumbo cell (which size is bigger than normal chunk). It is allocated
** once per cell. So even if there is space this is not reused.
** Jumbo Chunks are used only for CCM and thus are created only in
** CompactingMemStore.IndexType.CHUNK_MAP type.
** Returning a new chunk, without replacing current chunk,
** meaning MSLABImpl does not make the returned chunk as CurChunk.
** The space on this chunk will be allocated externally.
** The interface is only for external callers
*/
@Override
public Chunk getNewExternalJumboChunk(int size) {
// the new chunk is going to hold the jumbo cell data and need to be referenced by a strong map
// thus giving the CCM index type
Chunk c = this.chunkCreator.getJumboChunk(CompactingMemStore.IndexType.CHUNK_MAP, size);
chunks.add(c.getId());
return c;
}
@VisibleForTesting
Chunk getCurrentChunk() {
return this.curChunk.get();

View File

@ -58,6 +58,7 @@ public class TestCellFlatSet extends TestCase {
return new Object[] { "SMALL_CHUNKS", "NORMAL_CHUNKS" }; // test with different chunk sizes
}
private static final int NUM_OF_CELLS = 4;
private static final int SMALL_CHUNK_SIZE = 64;
private Cell ascCells[];
private CellArrayMap ascCbOnHeap;
private Cell descCells[];
@ -69,8 +70,7 @@ public class TestCellFlatSet extends TestCase {
private CellChunkMap ascCCM; // for testing ascending CellChunkMap with one chunk in array
private CellChunkMap descCCM; // for testing descending CellChunkMap with one chunk in array
private CellChunkMap ascMultCCM; // testing ascending CellChunkMap with multiple chunks in array
private CellChunkMap descMultCCM;// testing descending CellChunkMap with multiple chunks in array
private final boolean smallChunks;
private static ChunkCreator chunkCreator;
@ -81,12 +81,13 @@ public class TestCellFlatSet extends TestCase {
chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
assertTrue(chunkCreator != null);
smallChunks = false;
} else {
// chunkCreator with smaller chunk size, so only 3 cell-representations can accommodate a chunk
chunkCreator = ChunkCreator.initialize(64, false,
chunkCreator = ChunkCreator.initialize(SMALL_CHUNK_SIZE, false,
globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
assertTrue(chunkCreator != null);
smallChunks = true;
}
}
@ -125,9 +126,9 @@ public class TestCellFlatSet extends TestCase {
ascCCM = setUpCellChunkMap(true);
descCCM = setUpCellChunkMap(false);
// ascMultCCM = setUpCellChunkMap(true);
// descMultCCM = setUpCellChunkMap(false);
if (smallChunks == true) { // check jumbo chunks as well
ascCCM = setUpJumboCellChunkMap(true);
}
}
/* Create and test ascending CellSet based on CellArrayMap */
@ -278,8 +279,8 @@ public class TestCellFlatSet extends TestCase {
// allocate new chunks and use the data chunk to hold the full data of the cells
// and the index chunk to hold the cell-representations
Chunk dataChunk = chunkCreator.getChunk();
Chunk idxChunk = chunkCreator.getChunk();
Chunk dataChunk = chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
Chunk idxChunk = chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
// the array of index chunks to be used as a basis for CellChunkMap
Chunk chunkArray[] = new Chunk[8]; // according to test currently written 8 is way enough
int chunkArrayIdx = 0;
@ -295,7 +296,8 @@ public class TestCellFlatSet extends TestCase {
for (Cell kv: cellArray) {
// do we have enough space to write the cell data on the data chunk?
if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) {
dataChunk = chunkCreator.getChunk(); // allocate more data chunks if needed
// allocate more data chunks if needed
dataChunk = chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
dataBuffer = dataChunk.getData();
dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
}
@ -304,7 +306,8 @@ public class TestCellFlatSet extends TestCase {
// do we have enough space to write the cell-representation on the index chunk?
if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) {
idxChunk = chunkCreator.getChunk(); // allocate more index chunks if needed
// allocate more index chunks if needed
idxChunk = chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
idxBuffer = idxChunk.getData();
idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
chunkArray[chunkArrayIdx++] = idxChunk;
@ -317,4 +320,53 @@ public class TestCellFlatSet extends TestCase {
return new CellChunkMap(CellComparatorImpl.COMPARATOR,chunkArray,0,NUM_OF_CELLS,!asc);
}
/* Create CellChunkMap with four cells inside the data jumbo chunk. This test is working only
** with small chunks sized SMALL_CHUNK_SIZE (64) bytes */
private CellChunkMap setUpJumboCellChunkMap(boolean asc) {
int smallChunkSize = SMALL_CHUNK_SIZE+8;
// allocate new chunks and use the data JUMBO chunk to hold the full data of the cells
// and the normal index chunk to hold the cell-representations
Chunk dataJumboChunk =
chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP, smallChunkSize);
Chunk idxChunk = chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
// the array of index chunks to be used as a basis for CellChunkMap
Chunk[] chunkArray = new Chunk[8]; // according to test currently written 8 is way enough
int chunkArrayIdx = 0;
chunkArray[chunkArrayIdx++] = idxChunk;
ByteBuffer idxBuffer = idxChunk.getData(); // the buffers of the chunks
ByteBuffer dataBuffer = dataJumboChunk.getData();
int dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // offset inside data buffer
int idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; // skip the space for chunk ID
Cell[] cellArray = asc ? ascCells : descCells;
for (Cell kv: cellArray) {
int dataStartOfset = dataOffset;
dataOffset = KeyValueUtil.appendTo(kv, dataBuffer, dataOffset, false); // write deep cell data
// do we have enough space to write the cell-representation on the index chunk?
if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) {
// allocate more index chunks if needed
idxChunk = chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
idxBuffer = idxChunk.getData();
idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
chunkArray[chunkArrayIdx++] = idxChunk;
}
// write data chunk id
idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataJumboChunk.getId());
idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataStartOfset); // offset
idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, KeyValueUtil.length(kv)); // length
idxOffset = ByteBufferUtils.putLong(idxBuffer, idxOffset, kv.getSequenceId()); // seqId
// Jumbo chunks are working only with one cell per chunk, thus always allocate a new jumbo
// data chunk for next cell
dataJumboChunk = chunkCreator.getChunk(CompactingMemStore.IndexType.CHUNK_MAP,smallChunkSize);
dataBuffer = dataJumboChunk.getData();
dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
}
return new CellChunkMap(CellComparatorImpl.COMPARATOR,chunkArray,0,NUM_OF_CELLS,!asc);
}
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
@ -564,60 +563,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
public void testFlatteningToCellChunkMap() throws IOException {
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 8;
String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(keys1[0] + 0);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
int totalCellsLen = addRowsByKeys(memstore, keys1);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
.length(kv));
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize());
memstore.clearSnapshot(snapshot.getId());
}
//////////////////////////////////////////////////////////////////////////////
// Compaction tests
//////////////////////////////////////////////////////////////////////////////
@ -793,7 +738,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.clearSnapshot(snapshot.getId());
}
private int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
long size = hmc.getActive().keySize();

View File

@ -94,7 +94,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
}
// test 1 bucket
long totalCellsLen = addRowsByKeys(memstore, keys1);
long totalCellsLen = addRowsByKeysDataSize(memstore, keys1);
long cellBeforeFlushSize = cellBeforeFlushSize();
long cellAfterFlushSize = cellAfterFlushSize();
long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
@ -140,7 +140,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" };
long totalCellsLen1 = addRowsByKeys(memstore, keys1); // INSERT 4
long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); // INSERT 4
long cellBeforeFlushSize = cellBeforeFlushSize();
long cellAfterFlushSize = cellAfterFlushSize();
long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
@ -164,7 +164,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen2 = addRowsByKeys(memstore, keys2); // INSERT 3 (3+3=6)
long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); // INSERT 3 (3+3=6)
long totalHeapSize2 = 3 * cellBeforeFlushSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
@ -202,7 +202,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
String[] keys2 = { "A", "B", "D" };
String[] keys3 = { "D", "B", "B" };
long totalCellsLen1 = addRowsByKeys(memstore, keys1);
long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);
long cellBeforeFlushSize = cellBeforeFlushSize();
long cellAfterFlushSize = cellAfterFlushSize();
long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
@ -223,7 +223,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen2 = addRowsByKeys(memstore, keys2);
long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);
long totalHeapSize2 = 3 * cellBeforeFlushSize;
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
@ -237,7 +237,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
long totalCellsLen3 = addRowsByKeys(memstore, keys3);
long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3);
long totalHeapSize3 = 3 * cellBeforeFlushSize;
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
regionServicesForStores.getMemStoreSize());
@ -294,7 +294,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
addRowsByKeys(memstore, keys1);
addRowsByKeysDataSize(memstore, keys1);
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
@ -303,7 +303,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
addRowsByKeys(memstore, keys2); // also should only flatten
addRowsByKeysDataSize(memstore, keys2); // also should only flatten
int counter2 = 0;
for ( Segment s : memstore.getSegments()) {
@ -322,7 +322,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
}
assertEquals(12, counter3);
addRowsByKeys(memstore, keys3);
addRowsByKeysDataSize(memstore, keys3);
int counter4 = 0;
for ( Segment s : memstore.getSegments()) {
@ -604,7 +604,62 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
}
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
@Test
public void testFlatteningToCellChunkMap() throws IOException {
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 8;
String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(keys1[0] + 0);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
int totalCellsLen = addRowsByKeys(memstore, keys1);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
.length(kv));
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize());
memstore.clearSnapshot(snapshot.getId());
}
private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
MemStoreSizing memstoreSizing = new MemStoreSizing();