HBASE-19133 Transfer big cells or upserted/appended cells into

MSLAB upon flattening to CellChunkMap

Signed-off-by: Gali Sheffi <gsheffi@oath.com>
This commit is contained in:
gsheffi 2017-12-27 11:55:14 +02:00 committed by Michael Stack
parent c3b4f788b1
commit 0c4b520d97
11 changed files with 302 additions and 22 deletions

View File

@ -104,7 +104,7 @@ public abstract class AbstractMemStore implements MemStore {
@Override
public void add(Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(cell);
Cell toAdd = maybeCloneWithAllocator(cell, false);
boolean mslabUsed = (toAdd != cell);
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
// default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep
@ -268,8 +268,21 @@ public abstract class AbstractMemStore implements MemStore {
return result;
}
private Cell maybeCloneWithAllocator(Cell cell) {
return active.maybeCloneWithAllocator(cell);
/**
* If the segment has a memory allocator the cell is being cloned to this space, and returned;
* Otherwise the given cell is returned
*
* When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB.
* Since the process of flattening to CellChunkMap assumes that all cells are allocated on MSLAB,
* during this process, the input parameter forceCloneOfBigCell is set to 'true'
* and the cell is copied into MSLAB.
*
* @param cell the cell to clone
* @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap.
* @return either the given cell or its clone
*/
private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
}
/*

View File

@ -101,11 +101,11 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
cells[i] = c;
} else {
// now we just copy it to the new segment (also MSLAB copy)
cells[i] = maybeCloneWithAllocator(c);
cells[i] = maybeCloneWithAllocator(c, false);
}
// second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, null); // updates the size per cell
updateMetaInfo(cells[i], true, null); // updates the size per cell
if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
//counting number of unique keys
if (prev != null) {

View File

@ -18,18 +18,20 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* CellChunkImmutableSegment extends the API supported by a {@link Segment},
@ -109,15 +111,24 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
chunks[i] = this.getMemStoreLAB().getNewExternalChunk();
}
while (iterator.hasNext()) { // the iterator hides the elimination logic for compaction
boolean alreadyCopied = false;
Cell c = iterator.next();
numOfCellsAfterCompaction++;
assert (c instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
assert(c instanceof ExtendedCell);
if (((ExtendedCell)c).getChunkId() == ExtendedCell.CELL_NOT_BASED_ON_CHUNK) {
// CellChunkMap assumes all cells are allocated on MSLAB.
// Therefore, cells which are not allocated on MSLAB initially,
// are copied into MSLAB here.
c = copyCellIntoMSLAB(c);
alreadyCopied = true;
}
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) {
currentChunkIdx++; // continue to the next index chunk
offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
}
if (action == MemStoreCompactionStrategy.Action.COMPACT) {
c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy)
if (action == MemStoreCompactionStrategy.Action.COMPACT && !alreadyCopied) {
// for compaction copy cell to the new segment (MSLAB copy)
c = maybeCloneWithAllocator(c, false);
}
offsetInCurentChunk = // add the Cell reference to the index chunk
createCellReference((ByteBufferKeyValue)c, chunks[currentChunkIdx].getData(),
@ -153,7 +164,6 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet
// (without compacting iterator)
// This is a service for not-flat immutable segments
// Assumption: cells do not exceed chunk size!
private void reinitializeCellSet(
int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
MemStoreCompactionStrategy.Action action) {
@ -175,7 +185,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
Cell prev = null;
try {
while ((curCell = segmentScanner.next()) != null) {
assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
assert(curCell instanceof ExtendedCell);
if (((ExtendedCell)curCell).getChunkId() == ExtendedCell.CELL_NOT_BASED_ON_CHUNK) {
// CellChunkMap assumes all cells are allocated on MSLAB.
// Therefore, cells which are not allocated on MSLAB initially,
// are copied into MSLAB here.
curCell = copyCellIntoMSLAB(curCell);
}
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) {
// continue to the next metadata chunk
currentChunkIdx++;
@ -231,4 +247,23 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
}
return numberOfChunks;
}
private Cell copyCellIntoMSLAB(Cell cell) {
// Take care for a special case when a cell is copied from on-heap to (probably off-heap) MSLAB.
// The cell allocated as an on-heap JVM object (byte array) occupies slightly different
// amount of memory, than when the cell serialized and allocated on the MSLAB.
// Here, we update the heap size of the new segment only for the difference between object and
// serialized size. This is a decrease of the size as serialized cell is a bit smaller.
// The actual size of the cell is not added yet, and will be added (only in compaction)
// in initializeCellSet#updateMetaInfo().
long oldHeapSize = heapSizeChange(cell, true);
long oldCellSize = getCellLength(cell);
cell = maybeCloneWithAllocator(cell, true);
long newHeapSize = heapSizeChange(cell, true);
long newCellSize = getCellLength(cell);
long heapOverhead = newHeapSize - oldHeapSize;
//TODO: maybe need to update the dataSize of the region
incSize(newCellSize - oldCellSize, heapOverhead);
return cell;
}
}

View File

@ -1,4 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -164,9 +163,11 @@ public class ChunkCreator {
if (jumboSize <= chunkSize) {
LOG.warn("Jumbo chunk size " + jumboSize + " must be more than regular chunk size "
+ chunkSize + ". Converting to regular chunk.");
getChunk(chunkIndexType,chunkSize);
return getChunk(chunkIndexType,chunkSize);
}
return getChunk(chunkIndexType, jumboSize);
// the size of the allocation includes
// both the size requested and a place for the Chunk's header
return getChunk(chunkIndexType, jumboSize + SIZEOF_CHUNK_HEADER);
}
/**

View File

@ -110,7 +110,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
* @return either the given cell or its clone
*/
@Override
public Cell maybeCloneWithAllocator(Cell cell) {
public Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}

View File

@ -45,6 +45,11 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
throw new IllegalStateException("This is an Immutable MemStoreLAB.");
}
@Override
public Cell forceCopyOfBigCellInto(Cell cell) {
throw new IllegalStateException("This is an Immutable MemStoreLAB.");
}
/* Creating chunk to be used as index chunk in CellChunkMap, part of the chunks array.
** Returning a new chunk, without replacing current chunk,
** meaning MSLABImpl does not make the returned chunk as CurChunk.

View File

@ -68,6 +68,17 @@ public interface MemStoreLAB {
*/
Cell copyCellInto(Cell cell);
/**
* Allocates slice in this LAB and copy the passed Cell into this area. Returns new Cell instance
* over the copied the data. When this MemStoreLAB can not copy this Cell, it returns null.
*
* Since the process of flattening to CellChunkMap assumes all cells are allocated on MSLAB,
* and since copyCellInto does not copy big cells (for whom size > maxAlloc) into MSLAB,
* this method is called while the process of flattening to CellChunkMap is running,
* for forcing the allocation of big cells on this MSLAB.
*/
Cell forceCopyOfBigCellInto(Cell cell);
/**
* Close instance since it won't be used any more, try to put the chunks back to pool
*/
@ -92,7 +103,7 @@ public interface MemStoreLAB {
Chunk getNewExternalChunk();
/* Creating chunk to be used as data chunk in CellChunkMap.
** This chunk is bigger the normal constant chunk size, and thus called JumboChunk it is used for
** This chunk is bigger than normal constant chunk size, and thus called JumboChunk it is used for
** jumbo cells (which size is bigger than normal chunks).
** Jumbo Chunks are needed only for CCM and thus are created only in
** CompactingMemStore.IndexType.CHUNK_MAP type.

View File

@ -107,6 +107,32 @@ public class MemStoreLABImpl implements MemStoreLAB {
@Override
public Cell copyCellInto(Cell cell) {
return copyCellInto(cell, maxAlloc);
}
/**
* When a cell's size is too big (bigger than maxAlloc),
* copyCellInto does not allocate it on MSLAB.
* Since the process of flattening to CellChunkMap assumes that
* all cells are allocated on MSLAB, during this process,
* the big cells are copied into MSLAB using this method.
*/
@Override
public Cell forceCopyOfBigCellInto(Cell cell) {
int size = KeyValueUtil.length(cell);
Preconditions.checkArgument(size >= 0, "negative size");
if (size <= chunkSize) {
// Using copyCellInto for cells which are bigger than the original maxAlloc
Cell newCell = copyCellInto(cell, chunkSize);
return newCell;
} else {
Chunk c = getNewExternalJumboChunk(size);
int allocOffset = c.alloc(size);
return copyToChunkCell(cell, c.getData(), allocOffset, size);
}
}
private Cell copyCellInto(Cell cell, int maxAlloc) {
int size = KeyValueUtil.length(cell);
Preconditions.checkArgument(size >= 0, "negative size");
// Callers should satisfy large allocations directly from JVM since they

View File

@ -158,14 +158,25 @@ public abstract class Segment {
/**
* If the segment has a memory allocator the cell is being cloned to this space, and returned;
* otherwise the given cell is returned
*
* When a cell's size is too big (bigger than maxAlloc), it is not allocated on MSLAB.
* Since the process of flattening to CellChunkMap assumes that all cells
* are allocated on MSLAB, during this process, the input parameter
* forceCloneOfBigCell is set to 'true' and the cell is copied into MSLAB.
*
* @return either the given cell or its clone
*/
public Cell maybeCloneWithAllocator(Cell cell) {
public Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) {
if (this.memStoreLAB == null) {
return cell;
}
Cell cellFromMslab = this.memStoreLAB.copyCellInto(cell);
Cell cellFromMslab = null;
if (forceCloneOfBigCell) {
cellFromMslab = this.memStoreLAB.forceCopyOfBigCellInto(cell);
} else {
cellFromMslab = this.memStoreLAB.copyCellInto(cell);
}
return (cellFromMslab != null) ? cellFromMslab : cell;
}

View File

@ -836,6 +836,27 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
return totalLen;
}
// for controlling the val size when adding a new cell
protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
long size = hmc.getActive().keySize();
long heapOverhead = hmc.getActive().heapSize();
int totalLen = 0;
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
totalLen += kv.getLength();
hmc.add(kv, null);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
}
regionServicesForStores.addMemStoreSize(new MemStoreSize(hmc.getActive().keySize() - size,
hmc.getActive().heapSize() - heapOverhead));
return totalLen;
}
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
long t = 1234;

View File

@ -646,8 +646,6 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
@ -666,6 +664,165 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
memstore.clearSnapshot(snapshot.getId());
}
/**
* CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
* Even though MSLAB is enabled, cells bigger than maxAlloc
* (even if smaller than the size of a chunk) are not written in the MSLAB Chunks.
* If such cells are found in the process of flattening into CellChunkMap
* (in-memory-flush) they need to be copied into MSLAB.
* testFlatteningToBigCellChunkMap checks that the process of flattening into
* CellChunkMap succeeds, even when such big cells are allocated.
*/
@Test
public void testFlatteningToBigCellChunkMap() throws IOException {
if (toCellChunkMap == false) {
return;
}
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 4;
char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT];
for (int i = 0; i < chars.length; i++) {
chars[i] = 'A';
}
String bigVal = new String(chars);
String[] keys1 = { "A", "B", "C", "D"};
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(bigVal);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
int totalCellsLen = addRowsByKeys(memstore, keys1, val);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize());
memstore.clearSnapshot(snapshot.getId());
}
/**
* CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
* Even though MSLAB is enabled, cells bigger than the size of a chunk are not
* written in the MSLAB Chunks.
* If such cells are found in the process of flattening into CellChunkMap
* (in-memory-flush) they need to be copied into MSLAB.
* testFlatteningToJumboCellChunkMap checks that the process of flattening
* into CellChunkMap succeeds, even when such big cells are allocated.
*/
@Test
public void testFlatteningToJumboCellChunkMap() throws IOException {
if (toCellChunkMap == false) {
return;
}
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 1;
char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
for (int i = 0; i < chars.length; i++) {
chars[i] = 'A';
}
String bigVal = new String(chars);
String[] keys1 = { "A"};
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(bigVal);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
int totalCellsLen = addRowsByKeys(memstore, keys1, val);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize());
memstore.clearSnapshot(snapshot.getId());
String[] keys2 = { "C", "D", "E"};
addRowsByKeys(memstore, keys2, val);
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
totalHeapSize = 1 * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ 2 * oneCellOnCCMHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
}
private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");