HBASE-15721 Optimization in cloning cells into MSLAB.
This commit is contained in:
parent
58e843dae2
commit
912ed17286
|
@ -543,6 +543,15 @@ public final class CellUtil {
|
|||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] buf, int offset) {
|
||||
offset = KeyValueUtil.appendToByteArray(this.cell, buf, offset, false);
|
||||
int tagsLen = this.tags.length;
|
||||
assert tagsLen > 0;
|
||||
offset = Bytes.putAsShort(buf, offset, tagsLen);
|
||||
System.arraycopy(this.tags, 0, buf, offset, tagsLen);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -59,4 +59,11 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
|
|||
*/
|
||||
// TODO remove the boolean param once HBASE-16706 is done.
|
||||
int getSerializedSize(boolean withTags);
|
||||
|
||||
/**
|
||||
* Write the given Cell into the given buf's offset.
|
||||
* @param buf The buffer where to write the Cell.
|
||||
* @param offset The offset within buffer, to write the Cell.
|
||||
*/
|
||||
void write(byte[] buf, int offset);
|
||||
}
|
|
@ -2492,6 +2492,11 @@ public class KeyValue implements ExtendedCell {
|
|||
return this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] buf, int offset) {
|
||||
System.arraycopy(this.bytes, this.offset, buf, offset, this.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Comparator that compares row component only of a KeyValue.
|
||||
*/
|
||||
|
|
|
@ -136,7 +136,7 @@ public class KeyValueUtil {
|
|||
public static byte[] copyToNewByteArray(final Cell cell) {
|
||||
int v1Length = length(cell);
|
||||
byte[] backingBytes = new byte[v1Length];
|
||||
appendToByteArray(cell, backingBytes, 0);
|
||||
appendToByteArray(cell, backingBytes, 0, true);
|
||||
return backingBytes;
|
||||
}
|
||||
|
||||
|
@ -156,15 +156,13 @@ public class KeyValueUtil {
|
|||
|
||||
/**************** copy key and value *********************/
|
||||
|
||||
public static int appendToByteArray(final Cell cell, final byte[] output, final int offset) {
|
||||
// TODO when cell instance of KV we can bypass all steps and just do backing single array
|
||||
// copy(?)
|
||||
public static int appendToByteArray(Cell cell, byte[] output, int offset, boolean withTags) {
|
||||
int pos = offset;
|
||||
pos = Bytes.putInt(output, pos, keyLength(cell));
|
||||
pos = Bytes.putInt(output, pos, cell.getValueLength());
|
||||
pos = appendKeyTo(cell, output, pos);
|
||||
pos = CellUtil.copyValueTo(cell, output, pos);
|
||||
if ((cell.getTagsLength() > 0)) {
|
||||
if (withTags && (cell.getTagsLength() > 0)) {
|
||||
pos = Bytes.putAsShort(output, pos, cell.getTagsLength());
|
||||
pos = CellUtil.copyTagTo(cell, output, pos);
|
||||
}
|
||||
|
@ -178,7 +176,7 @@ public class KeyValueUtil {
|
|||
*/
|
||||
public static ByteBuffer copyToNewByteBuffer(final Cell cell) {
|
||||
byte[] bytes = new byte[length(cell)];
|
||||
appendToByteArray(cell, bytes, 0);
|
||||
appendToByteArray(cell, bytes, 0, true);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(bytes);
|
||||
return buffer;
|
||||
}
|
||||
|
@ -658,4 +656,31 @@ public class KeyValueUtil {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the given cell in KeyValue serialization format into the given buf and return a new
|
||||
* KeyValue object around that.
|
||||
*/
|
||||
public static KeyValue copyCellTo(Cell cell, byte[] buf, int offset) {
|
||||
int tagsLen = cell.getTagsLength();
|
||||
int len = length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
|
||||
cell.getValueLength(), tagsLen, true);
|
||||
if (cell instanceof ExtendedCell) {
|
||||
((ExtendedCell) cell).write(buf, offset);
|
||||
} else {
|
||||
appendToByteArray(cell, buf, offset, true);
|
||||
}
|
||||
KeyValue newKv;
|
||||
if (tagsLen == 0) {
|
||||
// When tagsLen is 0, make a NoTagsKeyValue version of Cell. This is an optimized class which
|
||||
// directly return tagsLen as 0. So we avoid parsing many length components in reading the
|
||||
// tagLength stored in the backing buffer. The Memstore addition of every Cell call
|
||||
// getTagsLength().
|
||||
newKv = new NoTagsKeyValue(buf, offset, len);
|
||||
} else {
|
||||
newKv = new KeyValue(buf, offset, len);
|
||||
}
|
||||
newKv.setSequenceId(cell.getSequenceId());
|
||||
return newKv;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -253,6 +253,11 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
|
|||
return this.keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] buf, int offset) {
|
||||
ByteBufferUtils.copyFromBufferToArray(buf, this.buf, this.offset, offset, this.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return CellUtil.toString(this, true);
|
||||
|
|
|
@ -451,6 +451,12 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
|
|||
withTags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] buf, int offset) {
|
||||
// This is not used in actual flow. Throwing UnsupportedOperationException
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimestamp(long ts) throws IOException {
|
||||
// This is not used in actual flow. Throwing UnsupportedOperationException
|
||||
|
@ -695,6 +701,12 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
|
|||
// This is not used in actual flow. Throwing UnsupportedOperationException
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] buf, int offset) {
|
||||
// This is not used in actual flow. Throwing UnsupportedOperationException
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
|
||||
|
|
|
@ -26,11 +26,11 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -106,37 +106,31 @@ public class HeapMemStoreLAB implements MemStoreLAB {
|
|||
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a slice of the given length.
|
||||
*
|
||||
* If the size is larger than the maximum size specified for this
|
||||
* allocator, returns null.
|
||||
*/
|
||||
@Override
|
||||
public ByteRange allocateBytes(int size) {
|
||||
public Cell copyCellInto(Cell cell) {
|
||||
int size = KeyValueUtil.length(cell);
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
|
||||
// Callers should satisfy large allocations directly from JVM since they
|
||||
// don't cause fragmentation as badly.
|
||||
if (size > maxAlloc) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Chunk c = null;
|
||||
int allocOffset = 0;
|
||||
while (true) {
|
||||
Chunk c = getOrMakeChunk();
|
||||
|
||||
c = getOrMakeChunk();
|
||||
// Try to allocate from this chunk
|
||||
int allocOffset = c.alloc(size);
|
||||
allocOffset = c.alloc(size);
|
||||
if (allocOffset != -1) {
|
||||
// We succeeded - this is the common case - small alloc
|
||||
// from a big buffer
|
||||
return new SimpleMutableByteRange(c.getData(), allocOffset, size);
|
||||
break;
|
||||
}
|
||||
|
||||
// not enough space!
|
||||
// try to retire this chunk
|
||||
tryRetireChunk(c);
|
||||
}
|
||||
return KeyValueUtil.copyCellTo(cell, c.getData(), allocOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,34 +17,38 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
|
||||
/**
|
||||
* A memstore-local allocation buffer.
|
||||
* <p>
|
||||
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
|
||||
* and then doles it out to threads that request slices into the array.
|
||||
* and then doles it out to threads that request slices into the array. These chunks can get pooled
|
||||
* as well. See {@link MemStoreChunkPool}.
|
||||
* <p>
|
||||
* The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
|
||||
* KeyValues in a given memstore refer only to large chunks of contiguous memory, we ensure that
|
||||
* Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
|
||||
* large blocks get freed up when the memstore is flushed.
|
||||
* <p>
|
||||
* Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the
|
||||
* heap, and the old generation gets progressively more fragmented until a stop-the-world compacting
|
||||
* collection occurs.
|
||||
* <p>
|
||||
* This manages the large sized chunks. When Cells are to be added to Memstore, MemStoreLAB's
|
||||
* {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this
|
||||
* cell's data and copies into this area and then recreate a Cell over this copied data.
|
||||
* <p>
|
||||
* @see MemStoreChunkPool
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface MemStoreLAB {
|
||||
|
||||
/**
|
||||
* Allocate a slice of the given length. If the size is larger than the maximum size specified for
|
||||
* this allocator, returns null.
|
||||
* @param size
|
||||
* @return {@link ByteRange}
|
||||
* Allocates slice in this LAB and copy the passed Cell into this area. Returns new Cell instance
|
||||
* over the copied the data. When this MemStoreLAB can not copy this Cell, it returns null.
|
||||
*/
|
||||
ByteRange allocateBytes(int size);
|
||||
Cell copyCellInto(Cell cell);
|
||||
|
||||
/**
|
||||
* Close instance since it won't be used any more, try to put the chunks back to pool
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
||||
|
@ -147,18 +146,8 @@ public abstract class Segment {
|
|||
return cell;
|
||||
}
|
||||
|
||||
int len = getCellLength(cell);
|
||||
ByteRange alloc = this.memStoreLAB.allocateBytes(len);
|
||||
if (alloc == null) {
|
||||
// The allocation was too large, allocator decided
|
||||
// not to do anything with it.
|
||||
return cell;
|
||||
}
|
||||
assert alloc.getBytes() != null;
|
||||
KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
|
||||
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
|
||||
newKv.setSequenceId(cell.getSequenceId());
|
||||
return newKv;
|
||||
Cell cellFromMslab = this.memStoreLAB.copyCellInto(cell);
|
||||
return (cellFromMslab != null) ? cellFromMslab : cell;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -73,18 +73,22 @@ public class TestMemStoreChunkPool {
|
|||
MemStoreLAB mslab = new HeapMemStoreLAB(conf);
|
||||
int expectedOff = 0;
|
||||
byte[] lastBuffer = null;
|
||||
final byte[] rk = Bytes.toBytes("r1");
|
||||
final byte[] cf = Bytes.toBytes("f");
|
||||
final byte[] q = Bytes.toBytes("q");
|
||||
// Randomly allocate some bytes
|
||||
for (int i = 0; i < 100; i++) {
|
||||
int size = rand.nextInt(1000);
|
||||
ByteRange alloc = mslab.allocateBytes(size);
|
||||
|
||||
if (alloc.getBytes() != lastBuffer) {
|
||||
int valSize = rand.nextInt(1000);
|
||||
KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
|
||||
int size = KeyValueUtil.length(kv);
|
||||
KeyValue newKv = (KeyValue) mslab.copyCellInto(kv);
|
||||
if (newKv.getBuffer() != lastBuffer) {
|
||||
expectedOff = 0;
|
||||
lastBuffer = alloc.getBytes();
|
||||
lastBuffer = newKv.getBuffer();
|
||||
}
|
||||
assertEquals(expectedOff, alloc.getOffset());
|
||||
assertTrue("Allocation overruns buffer", alloc.getOffset()
|
||||
+ size <= alloc.getBytes().length);
|
||||
assertEquals(expectedOff, newKv.getOffset());
|
||||
assertTrue("Allocation overruns buffer",
|
||||
newKv.getOffset() + size <= newKv.getBuffer().length);
|
||||
expectedOff += size;
|
||||
}
|
||||
// chunks will be put back to pool after close
|
||||
|
@ -94,7 +98,8 @@ public class TestMemStoreChunkPool {
|
|||
// reconstruct mslab
|
||||
mslab = new HeapMemStoreLAB(conf);
|
||||
// chunk should be got from the pool, so we can reuse it.
|
||||
mslab.allocateBytes(1000);
|
||||
KeyValue kv = new KeyValue(rk, cf, q, new byte[10]);
|
||||
mslab.copyCellInto(kv);
|
||||
assertEquals(chunkCount - 1, chunkPool.getPoolSize());
|
||||
}
|
||||
|
||||
|
@ -202,21 +207,24 @@ public class TestMemStoreChunkPool {
|
|||
MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE;
|
||||
final int maxCount = 10;
|
||||
final int initialCount = 5;
|
||||
final int chunkSize = 10;
|
||||
final int chunkSize = 30;
|
||||
final int valSize = 7;
|
||||
MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, 1);
|
||||
assertEquals(initialCount, pool.getPoolSize());
|
||||
assertEquals(maxCount, pool.getMaxCount());
|
||||
MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.
|
||||
// Used it for the testing. Later in finally we put
|
||||
// back the original
|
||||
final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
|
||||
new byte[valSize]);
|
||||
try {
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
MemStoreLAB memStoreLAB = new HeapMemStoreLAB(conf);
|
||||
for (int i = 0; i < maxCount; i++) {
|
||||
memStoreLAB.allocateBytes(chunkSize);// Try allocate size = chunkSize. Means every
|
||||
// allocate call will result in a new chunk
|
||||
memStoreLAB.copyCellInto(kv);// Try allocate size = chunkSize. Means every
|
||||
// allocate call will result in a new chunk
|
||||
}
|
||||
// Close MemStoreLAB so that all chunks will be tried to be put back to pool
|
||||
memStoreLAB.close();
|
||||
|
|
|
@ -27,12 +27,15 @@ import java.util.Random;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ByteRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -45,6 +48,10 @@ import org.junit.experimental.categories.Category;
|
|||
@Category({RegionServerTests.class, SmallTests.class})
|
||||
public class TestMemStoreLAB {
|
||||
|
||||
private static final byte[] rk = Bytes.toBytes("r1");
|
||||
private static final byte[] cf = Bytes.toBytes("f");
|
||||
private static final byte[] q = Bytes.toBytes("q");
|
||||
|
||||
/**
|
||||
* Test a bunch of random allocations
|
||||
*/
|
||||
|
@ -58,16 +65,17 @@ public class TestMemStoreLAB {
|
|||
// should be reasonable for unit test and also cover wraparound
|
||||
// behavior
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
int size = rand.nextInt(1000);
|
||||
ByteRange alloc = mslab.allocateBytes(size);
|
||||
|
||||
if (alloc.getBytes() != lastBuffer) {
|
||||
int valSize = rand.nextInt(1000);
|
||||
KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
|
||||
int size = KeyValueUtil.length(kv);
|
||||
KeyValue newKv = (KeyValue) mslab.copyCellInto(kv);
|
||||
if (newKv.getBuffer() != lastBuffer) {
|
||||
expectedOff = 0;
|
||||
lastBuffer = alloc.getBytes();
|
||||
lastBuffer = newKv.getBuffer();
|
||||
}
|
||||
assertEquals(expectedOff, alloc.getOffset());
|
||||
assertEquals(expectedOff, newKv.getOffset());
|
||||
assertTrue("Allocation overruns buffer",
|
||||
alloc.getOffset() + size <= alloc.getBytes().length);
|
||||
newKv.getOffset() + size <= newKv.getBuffer().length);
|
||||
expectedOff += size;
|
||||
}
|
||||
}
|
||||
|
@ -75,10 +83,10 @@ public class TestMemStoreLAB {
|
|||
@Test
|
||||
public void testLABLargeAllocation() {
|
||||
MemStoreLAB mslab = new HeapMemStoreLAB();
|
||||
ByteRange alloc = mslab.allocateBytes(2*1024*1024);
|
||||
assertNull("2MB allocation shouldn't be satisfied by LAB.",
|
||||
alloc);
|
||||
}
|
||||
KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
|
||||
Cell newCell = mslab.copyCellInto(kv);
|
||||
assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test allocation from lots of threads, making sure the results don't
|
||||
|
@ -103,10 +111,12 @@ public class TestMemStoreLAB {
|
|||
private Random r = new Random();
|
||||
@Override
|
||||
public void doAnAction() throws Exception {
|
||||
int size = r.nextInt(1000);
|
||||
ByteRange alloc = mslab.allocateBytes(size);
|
||||
int valSize = r.nextInt(1000);
|
||||
KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
|
||||
int size = KeyValueUtil.length(kv);
|
||||
KeyValue newKv = (KeyValue) mslab.copyCellInto(kv);
|
||||
totalAllocated.addAndGet(size);
|
||||
allocsByThisThread.add(new AllocRecord(alloc, size));
|
||||
allocsByThisThread.add(new AllocRecord(newKv.getBuffer(), newKv.getOffset(), size));
|
||||
}
|
||||
};
|
||||
ctx.addThread(t);
|
||||
|
@ -129,12 +139,12 @@ public class TestMemStoreLAB {
|
|||
if (rec.size == 0) continue;
|
||||
|
||||
Map<Integer, AllocRecord> mapForThisByteArray =
|
||||
mapsByChunk.get(rec.alloc.getBytes());
|
||||
mapsByChunk.get(rec.alloc);
|
||||
if (mapForThisByteArray == null) {
|
||||
mapForThisByteArray = Maps.newTreeMap();
|
||||
mapsByChunk.put(rec.alloc.getBytes(), mapForThisByteArray);
|
||||
mapsByChunk.put(rec.alloc, mapForThisByteArray);
|
||||
}
|
||||
AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec);
|
||||
AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
|
||||
assertNull("Already had an entry " + oldVal + " for allocation " + rec,
|
||||
oldVal);
|
||||
}
|
||||
|
@ -144,9 +154,9 @@ public class TestMemStoreLAB {
|
|||
for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
|
||||
int expectedOff = 0;
|
||||
for (AllocRecord alloc : allocsInChunk.values()) {
|
||||
assertEquals(expectedOff, alloc.alloc.getOffset());
|
||||
assertEquals(expectedOff, alloc.offset);
|
||||
assertTrue("Allocation overruns buffer",
|
||||
alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getBytes().length);
|
||||
alloc.offset + alloc.size <= alloc.alloc.length);
|
||||
expectedOff += alloc.size;
|
||||
}
|
||||
}
|
||||
|
@ -173,8 +183,10 @@ public class TestMemStoreLAB {
|
|||
mslab = new HeapMemStoreLAB(conf);
|
||||
// launch multiple threads to trigger frequent chunk retirement
|
||||
List<Thread> threads = new ArrayList<Thread>();
|
||||
final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
|
||||
new byte[HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 24]);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i));
|
||||
threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
|
||||
}
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
|
@ -202,7 +214,8 @@ public class TestMemStoreLAB {
|
|||
+ " after mslab closed but actually: " + queueLength, queueLength == 0);
|
||||
}
|
||||
|
||||
private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName) {
|
||||
private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName,
|
||||
Cell cellToCopyInto) {
|
||||
Thread thread = new Thread() {
|
||||
boolean stopped = false;
|
||||
|
||||
|
@ -210,7 +223,7 @@ public class TestMemStoreLAB {
|
|||
public void run() {
|
||||
while (!stopped) {
|
||||
// keep triggering chunk retirement
|
||||
mslab.allocateBytes(HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 1);
|
||||
mslab.copyCellInto(cellToCopyInto);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -225,28 +238,29 @@ public class TestMemStoreLAB {
|
|||
}
|
||||
|
||||
private static class AllocRecord implements Comparable<AllocRecord>{
|
||||
private final ByteRange alloc;
|
||||
private final byte[] alloc;
|
||||
private final int offset;
|
||||
private final int size;
|
||||
public AllocRecord(ByteRange alloc, int size) {
|
||||
|
||||
public AllocRecord(byte[] alloc, int offset, int size) {
|
||||
super();
|
||||
this.alloc = alloc;
|
||||
this.offset = offset;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(AllocRecord e) {
|
||||
if (alloc.getBytes() != e.alloc.getBytes()) {
|
||||
if (alloc != e.alloc) {
|
||||
throw new RuntimeException("Can only compare within a particular array");
|
||||
}
|
||||
return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
|
||||
return Ints.compare(this.offset, e.offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AllocRecord(offset=" + alloc.getOffset() + ", size=" + size + ")";
|
||||
return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue