diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java new file mode 100644 index 00000000000..b5fe03397a9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java @@ -0,0 +1,53 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; + +/** + * CSLMImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, + * and {@link ImmutableSegment}. This immutable segment is working with CellSet with + * ConcurrentSkipListMap (CSLM) delegatee. + */ +@InterfaceAudience.Private +public class CSLMImmutableSegment extends ImmutableSegment { + public static final long DEEP_OVERHEAD_CSLM = + ImmutableSegment.DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP; + + /**------------------------------------------------------------------------ + * Copy C-tor to be used when new CSLMImmutableSegment is being built from a Mutable one. + * This C-tor should be used when active MutableSegment is pushed into the compaction + * pipeline and becomes an ImmutableSegment. + */ + protected CSLMImmutableSegment(Segment segment) { + super(segment); + // update the segment metadata heap size + incSize(0, -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM); + } + + @Override + protected long indexEntrySize() { + return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY; + } + + @Override protected boolean canBeFlattened() { + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java new file mode 100644 index 00000000000..fc3a652ec97 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java @@ -0,0 +1,132 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; + +import java.io.IOException; + +/** + * CellArrayImmutableSegment extends the API supported by a {@link Segment}, + * and {@link ImmutableSegment}. This immutable segment is working with CellSet with + * CellArrayMap delegatee. + */ +@InterfaceAudience.Private +public class CellArrayImmutableSegment extends ImmutableSegment { + + public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP; + + ///////////////////// CONSTRUCTORS ///////////////////// + /**------------------------------------------------------------------------ + * C-tor to be used when new CellArrayImmutableSegment is a result of compaction of a + * list of older ImmutableSegments. + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, + MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { + super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL + incSize(0, DEEP_OVERHEAD_CAM); + // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment + initializeCellSet(numOfCells, iterator, action); + } + + /**------------------------------------------------------------------------ + * C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening + * of CSLMImmutableSegment + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) { + super(segment); // initiailize the upper class + incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM); + int numOfCells = segment.getCellsCount(); + // build the new CellSet based on CellChunkMap and update the CellSet of this Segment + reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet()); + // arrange the meta-data size, decrease all meta-data sizes related to SkipList; + // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) + long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); + incSize(0, newSegmentSizeDelta); + memstoreSize.incMemstoreSize(0, newSegmentSizeDelta); + } + + @Override + protected long indexEntrySize() { + return ClassSize.CELL_ARRAY_MAP_ENTRY; + } + + @Override + protected boolean canBeFlattened() { + return false; + } + + ///////////////////// PRIVATE METHODS ///////////////////// + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellArrayMap from compacting iterator + private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator, + MemStoreCompactor.Action action) { + + Cell[] cells = new Cell[numOfCells]; // build the Cell Array + int i = 0; + while (iterator.hasNext()) { + Cell c = iterator.next(); + // The scanner behind the iterator is doing all the elimination logic + if (action == MemStoreCompactor.Action.MERGE) { + // if this is merge we just move the Cell object without copying MSLAB + // the sizes still need to be updated in the new segment + cells[i] = c; + } else { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + } + // second parameter true, because in compaction/merge the addition of the cell to new segment + // is always successful + updateMetaInfo(c, true, null); // updates the size per cell + i++; + } + // build the immutable CellSet + CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false); + this.setCellSet(null, new CellSet(cam)); // update the CellSet of this Segment + } + + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet + // (without compacting iterator) + // We do not consider cells bigger than chunks! + private void reinitializeCellSet( + int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) { + Cell[] cells = new Cell[numOfCells]; // build the Cell Array + Cell curCell; + int idx = 0; + try { + while ((curCell = segmentScanner.next()) != null) { + cells[idx++] = curCell; + } + } catch (IOException ie) { + throw new IllegalStateException(ie); + } finally { + segmentScanner.close(); + } + // build the immutable CellSet + CellArrayMap cam = new CellArrayMap(CellComparator.COMPARATOR, cells, 0, idx, false); + this.setCellSet(oldCellSet, new CellSet(cam)); // update the CellSet of this Segment + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java new file mode 100644 index 00000000000..d4667e1c627 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.ClassSize; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * CellChunkImmutableSegment extends the API supported by a {@link Segment}, + * and {@link ImmutableSegment}. This immutable segment is working with CellSet with + * CellChunkMap delegatee. + */ +@InterfaceAudience.Private +public class CellChunkImmutableSegment extends ImmutableSegment { + + public static final long DEEP_OVERHEAD_CCM = + ImmutableSegment.DEEP_OVERHEAD + ClassSize.CELL_CHUNK_MAP; + + ///////////////////// CONSTRUCTORS ///////////////////// + /**------------------------------------------------------------------------ + * C-tor to be used when new CellChunkImmutableSegment is built as a result of compaction/merge + * of a list of older ImmutableSegments. + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, + MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { + super(null, comparator, memStoreLAB); // initialize the CellSet with NULL + incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata + // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment + initializeCellSet(numOfCells, iterator, action); + } + + /**------------------------------------------------------------------------ + * C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening + * of CSLMImmutableSegment + * The given iterator returns the Cells that "survived" the compaction. + */ + protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) { + super(segment); // initiailize the upper class + incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); + int numOfCells = segment.getCellsCount(); + // build the new CellSet based on CellChunkMap + reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet()); + // arrange the meta-data size, decrease all meta-data sizes related to SkipList; + // add sizes of CellChunkMap entry, decrease also Cell object sizes + // (reinitializeCellSet doesn't take the care for the sizes) + long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); + + incSize(0, newSegmentSizeDelta); + memstoreSize.incMemstoreSize(0, newSegmentSizeDelta); + } + + @Override + protected long indexEntrySize() { + return (ClassSize.CELL_CHUNK_MAP_ENTRY - KeyValue.FIXED_OVERHEAD); + } + + @Override + protected boolean canBeFlattened() { + return false; + } + + ///////////////////// PRIVATE METHODS ///////////////////// + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellChunkMap from compacting iterator + private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator, + MemStoreCompactor.Action action) { + + // calculate how many chunks we will need for index + int chunkSize = ChunkCreator.getInstance().getChunkSize(); + int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; + int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk); + int numOfCellsAfterCompaction = 0; + int currentChunkIdx = 0; + int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + // all index Chunks are allocated from ChunkCreator + Chunk[] chunks = new Chunk[numberOfChunks]; + for (int i=0; i < numberOfChunks; i++) { + chunks[i] = this.getMemStoreLAB().getNewExternalChunk(); + } + while (iterator.hasNext()) { // the iterator hides the elimination logic for compaction + Cell c = iterator.next(); + numOfCellsAfterCompaction++; + assert (c instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue + if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) { + currentChunkIdx++; // continue to the next index chunk + offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + } + if (action == MemStoreCompactor.Action.COMPACT) { + c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy) + } + offsetInCurentChunk = // add the Cell reference to the index chunk + createCellReference((ByteBufferKeyValue)c, chunks[currentChunkIdx].getData(), + offsetInCurentChunk); + // the sizes still need to be updated in the new segment + // second parameter true, because in compaction/merge the addition of the cell to new segment + // is always successful + updateMetaInfo(c, true, null); // updates the size per cell + } + // build the immutable CellSet + CellChunkMap ccm = + new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCellsAfterCompaction,false); + this.setCellSet(null, new CellSet(ccm)); // update the CellSet of this Segment + } + + /*------------------------------------------------------------------------*/ + // Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet + // (without compacting iterator) + // This is a service for not-flat immutable segments + // Assumption: cells do not exceed chunk size! + private void reinitializeCellSet( + int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) { + Cell curCell; + // calculate how many chunks we will need for metadata + int chunkSize = ChunkCreator.getInstance().getChunkSize(); + int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; + int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk); + // all index Chunks are allocated from ChunkCreator + Chunk[] chunks = new Chunk[numberOfChunks]; + for (int i=0; i < numberOfChunks; i++) { + chunks[i] = this.getMemStoreLAB().getNewExternalChunk(); + } + + int currentChunkIdx = 0; + int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + + try { + while ((curCell = segmentScanner.next()) != null) { + assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue + if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) { + // continue to the next metadata chunk + currentChunkIdx++; + offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + } + offsetInCurentChunk = + createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(), + offsetInCurentChunk); + } + } catch (IOException ie) { + throw new IllegalStateException(ie); + } finally { + segmentScanner.close(); + } + + CellChunkMap ccm = new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCells,false); + this.setCellSet(oldCellSet, new CellSet(ccm)); // update the CellSet of this Segment + } + + /*------------------------------------------------------------------------*/ + // for a given cell, write the cell representation on the index chunk + private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) { + int offset = idxOffset; + int dataChunkID = cell.getChunkId(); + + offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID); // write data chunk id + offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset()); // offset + offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length + offset = ByteBufferUtils.putLong(idxBuffer, offset, cell.getSequenceId()); // seqId + + return offset; + } + + private int calculateNumberOfChunks(int numOfCells, int numOfCellsInChunk) { + int numberOfChunks = numOfCells/numOfCellsInChunk; + if(numOfCells%numOfCellsInChunk!=0) { // if cells cannot be divided evenly between chunks + numberOfChunks++; // add one additional chunk + } + return numberOfChunks; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java new file mode 100644 index 00000000000..6011af7808c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -0,0 +1,566 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.Threads; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.List; + +/** + * compacted memstore test case + */ +@Category({RegionServerTests.class, MediumTests.class}) +@RunWith(Parameterized.class) +public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore { + @Parameterized.Parameters + public static Object[] data() { + return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes + } + private static final Log LOG = LogFactory.getLog(TestCompactingToCellFlatMapMemStore.class); + public final boolean toCellChunkMap; + Configuration conf; + ////////////////////////////////////////////////////////////////////////////// + // Helpers + ////////////////////////////////////////////////////////////////////////////// + public TestCompactingToCellFlatMapMemStore(String type){ + if (type == "CHUNK_MAP") { + toCellChunkMap = true; + } else { + toCellChunkMap = false; + } + } + + @Override public void tearDown() throws Exception { + chunkCreator.clearChunksInPool(); + } + + @Override public void setUp() throws Exception { + + compactingSetUp(); + this.conf = HBaseConfiguration.create(); + + // set memstore to do data compaction + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(MemoryCompactionPolicy.EAGER)); + + this.memstore = + new CompactingMemStore(conf, CellComparator.COMPARATOR, store, + regionServicesForStores, MemoryCompactionPolicy.EAGER); + } + + ////////////////////////////////////////////////////////////////////////////// + // Compaction tests + ////////////////////////////////////////////////////////////////////////////// + public void testCompaction1Bucket() throws IOException { + int counter = 0; + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP)); + ((CompactingMemStore)memstore).setIndexType(); + } + + // test 1 bucket + long totalCellsLen = addRowsByKeys(memstore, keys1); + long cellBeforeFlushSize = cellBeforeFlushSize(); + long cellAfterFlushSize = cellAfterFlushSize(); + long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; + + assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); + + assertEquals(4, memstore.getActive().getCellsCount()); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting + // totalCellsLen + totalCellsLen = (totalCellsLen * 3) / 4; + assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize()); + + totalHeapSize = + 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3, counter); + MemstoreSize size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemstoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(3, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemstoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction2Buckets() throws IOException { + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP)); + ((CompactingMemStore)memstore).setIndexType(); + } + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + + long totalCellsLen1 = addRowsByKeys(memstore, keys1); // INSERT 4 + long cellBeforeFlushSize = cellBeforeFlushSize(); + long cellAfterFlushSize = cellAfterFlushSize(); + long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; + assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + int counter = 0; // COMPACT 4->3 + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(3,counter); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting + // totalCellsLen + totalCellsLen1 = (totalCellsLen1 * 3) / 4; + totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + long totalCellsLen2 = addRowsByKeys(memstore, keys2); // INSERT 3 (3+3=6) + long totalHeapSize2 = 3 * cellBeforeFlushSize; + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4 + counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(4,counter); + totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2 + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + totalHeapSize2 = 1 * cellAfterFlushSize; + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + MemstoreSize size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemstoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemstoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + + public void testCompaction3Buckets() throws IOException { + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP)); + ((CompactingMemStore)memstore).setIndexType(); + } + String[] keys1 = { "A", "A", "B", "C" }; + String[] keys2 = { "A", "B", "D" }; + String[] keys3 = { "D", "B", "B" }; + + long totalCellsLen1 = addRowsByKeys(memstore, keys1); + long cellBeforeFlushSize = cellBeforeFlushSize(); + long cellAfterFlushSize = cellAfterFlushSize(); + long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; + assertEquals(totalCellsLen1, region.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + MemstoreSize size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting + // totalCellsLen + totalCellsLen1 = (totalCellsLen1 * 3) / 4; + totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); + + long totalCellsLen2 = addRowsByKeys(memstore, keys2); + long totalHeapSize2 = 3 * cellBeforeFlushSize; + + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).disableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction + totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; + assertEquals(0, memstore.getSnapshot().getCellsCount()); + assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); + + long totalCellsLen3 = addRowsByKeys(memstore, keys3); + long totalHeapSize3 = 3 * cellBeforeFlushSize; + assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, + regionServicesForStores.getMemstoreSize()); + assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3, + ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore) memstore).enableCompaction(); + size = memstore.getFlushableSize(); + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. + // Out of total 10, only 4 cells are unique + totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated + totalCellsLen3 = 0;// All duplicated cells. + assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, + regionServicesForStores.getMemstoreSize()); + // Only 4 unique cells left + long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD + + (toCellChunkMap ? + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); + assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize()); + + size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemstoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(4, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemstoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + + } + + ////////////////////////////////////////////////////////////////////////////// + // Merging tests + ////////////////////////////////////////////////////////////////////////////// + @Test + public void testMerging() throws IOException { + if (toCellChunkMap) { + // set memstore to flat into CellChunkMap + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP)); + ((CompactingMemStore)memstore).setIndexType(); + } + String[] keys1 = { "A", "A", "B", "C", "F", "H"}; + String[] keys2 = { "A", "B", "D", "G", "I", "J"}; + String[] keys3 = { "D", "B", "B", "E" }; + + MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; + memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(compactionType)); + ((CompactingMemStore)memstore).initiateType(compactionType); + addRowsByKeys(memstore, keys1); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact + + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys2); // also should only flatten + + int counter2 = 0; + for ( Segment s : memstore.getSegments()) { + counter2 += s.getCellsCount(); + } + assertEquals(12, counter2); + + ((CompactingMemStore) memstore).disableCompaction(); + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + int counter3 = 0; + for ( Segment s : memstore.getSegments()) { + counter3 += s.getCellsCount(); + } + assertEquals(12, counter3); + + addRowsByKeys(memstore, keys3); + + int counter4 = 0; + for ( Segment s : memstore.getSegments()) { + counter4 += s.getCellsCount(); + } + assertEquals(16, counter4); + + ((CompactingMemStore) memstore).enableCompaction(); + + + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + int counter = 0; + for ( Segment s : memstore.getSegments()) { + counter += s.getCellsCount(); + } + assertEquals(16,counter); + + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + ImmutableSegment s = memstore.getSnapshot(); + memstore.clearSnapshot(snapshot.getId()); + } + + @Test + public void testCountOfCellsAfterFlatteningByScan() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + List scanners = memstore.getScanners(Long.MAX_VALUE); + // seek + int count = 0; + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).seek(KeyValue.LOWESTKEY); + while (scanners.get(i).next() != null) { + count++; + } + } + assertEquals("the count should be ", count, 150); + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).close(); + } + } + + @Test + public void testCountOfCellsAfterFlatteningByIterator() throws IOException { + String[] keys1 = { "A", "B", "C" }; // A, B, C + addRowsByKeysWith50Cols(memstore, keys1); + // this should only flatten as there are no duplicates + ((CompactingMemStore) memstore).flushInMemory(); + while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { + Threads.sleep(10); + } + // Just doing the cnt operation here + MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( + ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), + CellComparator.COMPARATOR, 10); + int cnt = 0; + try { + while (itr.next() != null) { + cnt++; + } + } finally { + itr.close(); + } + assertEquals("the count should be ", cnt, 150); + } + + private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + for(int j =0 ;j < 50; j++) { + byte[] qf = Bytes.toBytes("testqualifier"+j); + byte[] val = Bytes.toBytes(keys[i] + j); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv, null); + } + } + } + + @Override + @Test + public void testPuttingBackChunksWithOpeningScanner() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] qf6 = Bytes.toBytes("testqualifier6"); + byte[] qf7 = Bytes.toBytes("testqualifier7"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + + // opening scanner before clear the snapshot + List scanners = memstore.getScanners(0); + // Shouldn't putting back the chunks to pool,since some scanners are opening + // based on their data + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } + memstore.clearSnapshot(snapshot.getId()); + + assertTrue(chunkCreator.getPoolSize() == 0); + + // Chunks will be put back to pool after close scanners; + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + assertTrue(chunkCreator.getPoolSize() > 0); + + // clear chunks + chunkCreator.clearChunksInPool(); + + // Creating another snapshot + + snapshot = memstore.snapshot(); + // Adding more value + memstore.add(new KeyValue(row, fam, qf6, val), null); + memstore.add(new KeyValue(row, fam, qf7, val), null); + // opening scanners + scanners = memstore.getScanners(0); + // close scanners before clear the snapshot + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + // Since no opening scanner, the chunks of snapshot should be put back to + // pool + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } + memstore.clearSnapshot(snapshot.getId()); + assertTrue(chunkCreator.getPoolSize() > 0); + } + + @Test + public void testPuttingBackChunksAfterFlushing() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } + memstore.clearSnapshot(snapshot.getId()); + + int chunkCount = chunkCreator.getPoolSize(); + assertTrue(chunkCount > 0); + } + + + private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf = Bytes.toBytes("testqualifier"); + MemstoreSize memstoreSize = new MemstoreSize(); + for (int i = 0; i < keys.length; i++) { + long timestamp = System.currentTimeMillis(); + Threads.sleep(1); // to make sure each kv gets a different ts + byte[] row = Bytes.toBytes(keys[i]); + byte[] val = Bytes.toBytes(keys[i] + i); + KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); + hmc.add(kv, memstoreSize); + LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); + } + regionServicesForStores.addMemstoreSize(memstoreSize); + return memstoreSize.getDataSize(); + } + + private long cellBeforeFlushSize() { + // make one cell + byte[] row = Bytes.toBytes("A"); + byte[] val = Bytes.toBytes("A" + 0); + KeyValue kv = + new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), + System.currentTimeMillis(), val); + return ClassSize.align( + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv)); + } + + private long cellAfterFlushSize() { + // make one cell + byte[] row = Bytes.toBytes("A"); + byte[] val = Bytes.toBytes("A" + 0); + KeyValue kv = + new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), + System.currentTimeMillis(), val); + + return toCellChunkMap ? + ClassSize.align( + ClassSize.CELL_CHUNK_MAP_ENTRY + KeyValueUtil.length(kv)) : + ClassSize.align( + ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv)); + } +}