HBASE-18010: CellChunkMap integration into CompactingMemStore, merge to Release 2.0, add new files
This commit is contained in:
parent
95405fbd83
commit
445c5bd7b7
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CSLMImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
|
||||||
|
* and {@link ImmutableSegment}. This immutable segment is working with CellSet with
|
||||||
|
* ConcurrentSkipListMap (CSLM) delegatee.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class CSLMImmutableSegment extends ImmutableSegment {
|
||||||
|
public static final long DEEP_OVERHEAD_CSLM =
|
||||||
|
ImmutableSegment.DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
|
||||||
|
|
||||||
|
/**------------------------------------------------------------------------
|
||||||
|
* Copy C-tor to be used when new CSLMImmutableSegment is being built from a Mutable one.
|
||||||
|
* This C-tor should be used when active MutableSegment is pushed into the compaction
|
||||||
|
* pipeline and becomes an ImmutableSegment.
|
||||||
|
*/
|
||||||
|
protected CSLMImmutableSegment(Segment segment) {
|
||||||
|
super(segment);
|
||||||
|
// update the segment metadata heap size
|
||||||
|
incSize(0, -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected long indexEntrySize() {
|
||||||
|
return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override protected boolean canBeFlattened() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,132 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CellArrayImmutableSegment extends the API supported by a {@link Segment},
|
||||||
|
* and {@link ImmutableSegment}. This immutable segment is working with CellSet with
|
||||||
|
* CellArrayMap delegatee.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class CellArrayImmutableSegment extends ImmutableSegment {
|
||||||
|
|
||||||
|
public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
|
||||||
|
|
||||||
|
///////////////////// CONSTRUCTORS /////////////////////
|
||||||
|
/**------------------------------------------------------------------------
|
||||||
|
* C-tor to be used when new CellArrayImmutableSegment is a result of compaction of a
|
||||||
|
* list of older ImmutableSegments.
|
||||||
|
* The given iterator returns the Cells that "survived" the compaction.
|
||||||
|
*/
|
||||||
|
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
|
||||||
|
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
|
||||||
|
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
|
||||||
|
incSize(0, DEEP_OVERHEAD_CAM);
|
||||||
|
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
|
||||||
|
initializeCellSet(numOfCells, iterator, action);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**------------------------------------------------------------------------
|
||||||
|
* C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening
|
||||||
|
* of CSLMImmutableSegment
|
||||||
|
* The given iterator returns the Cells that "survived" the compaction.
|
||||||
|
*/
|
||||||
|
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) {
|
||||||
|
super(segment); // initiailize the upper class
|
||||||
|
incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
|
||||||
|
int numOfCells = segment.getCellsCount();
|
||||||
|
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment
|
||||||
|
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
|
||||||
|
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
|
||||||
|
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
|
||||||
|
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||||
|
incSize(0, newSegmentSizeDelta);
|
||||||
|
memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected long indexEntrySize() {
|
||||||
|
return ClassSize.CELL_ARRAY_MAP_ENTRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean canBeFlattened() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////// PRIVATE METHODS /////////////////////
|
||||||
|
/*------------------------------------------------------------------------*/
|
||||||
|
// Create CellSet based on CellArrayMap from compacting iterator
|
||||||
|
private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
|
||||||
|
MemStoreCompactor.Action action) {
|
||||||
|
|
||||||
|
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
|
||||||
|
int i = 0;
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Cell c = iterator.next();
|
||||||
|
// The scanner behind the iterator is doing all the elimination logic
|
||||||
|
if (action == MemStoreCompactor.Action.MERGE) {
|
||||||
|
// if this is merge we just move the Cell object without copying MSLAB
|
||||||
|
// the sizes still need to be updated in the new segment
|
||||||
|
cells[i] = c;
|
||||||
|
} else {
|
||||||
|
// now we just copy it to the new segment (also MSLAB copy)
|
||||||
|
cells[i] = maybeCloneWithAllocator(c);
|
||||||
|
}
|
||||||
|
// second parameter true, because in compaction/merge the addition of the cell to new segment
|
||||||
|
// is always successful
|
||||||
|
updateMetaInfo(c, true, null); // updates the size per cell
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
// build the immutable CellSet
|
||||||
|
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
|
||||||
|
this.setCellSet(null, new CellSet(cam)); // update the CellSet of this Segment
|
||||||
|
}
|
||||||
|
|
||||||
|
/*------------------------------------------------------------------------*/
|
||||||
|
// Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet
|
||||||
|
// (without compacting iterator)
|
||||||
|
// We do not consider cells bigger than chunks!
|
||||||
|
private void reinitializeCellSet(
|
||||||
|
int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
|
||||||
|
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
|
||||||
|
Cell curCell;
|
||||||
|
int idx = 0;
|
||||||
|
try {
|
||||||
|
while ((curCell = segmentScanner.next()) != null) {
|
||||||
|
cells[idx++] = curCell;
|
||||||
|
}
|
||||||
|
} catch (IOException ie) {
|
||||||
|
throw new IllegalStateException(ie);
|
||||||
|
} finally {
|
||||||
|
segmentScanner.close();
|
||||||
|
}
|
||||||
|
// build the immutable CellSet
|
||||||
|
CellArrayMap cam = new CellArrayMap(CellComparator.COMPARATOR, cells, 0, idx, false);
|
||||||
|
this.setCellSet(oldCellSet, new CellSet(cam)); // update the CellSet of this Segment
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,195 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ByteBufferKeyValue;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CellChunkImmutableSegment extends the API supported by a {@link Segment},
|
||||||
|
* and {@link ImmutableSegment}. This immutable segment is working with CellSet with
|
||||||
|
* CellChunkMap delegatee.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class CellChunkImmutableSegment extends ImmutableSegment {
|
||||||
|
|
||||||
|
public static final long DEEP_OVERHEAD_CCM =
|
||||||
|
ImmutableSegment.DEEP_OVERHEAD + ClassSize.CELL_CHUNK_MAP;
|
||||||
|
|
||||||
|
///////////////////// CONSTRUCTORS /////////////////////
|
||||||
|
/**------------------------------------------------------------------------
|
||||||
|
* C-tor to be used when new CellChunkImmutableSegment is built as a result of compaction/merge
|
||||||
|
* of a list of older ImmutableSegments.
|
||||||
|
* The given iterator returns the Cells that "survived" the compaction.
|
||||||
|
*/
|
||||||
|
protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
|
||||||
|
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
|
||||||
|
super(null, comparator, memStoreLAB); // initialize the CellSet with NULL
|
||||||
|
incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata
|
||||||
|
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
|
||||||
|
initializeCellSet(numOfCells, iterator, action);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**------------------------------------------------------------------------
|
||||||
|
* C-tor to be used when new CellChunkImmutableSegment is built as a result of flattening
|
||||||
|
* of CSLMImmutableSegment
|
||||||
|
* The given iterator returns the Cells that "survived" the compaction.
|
||||||
|
*/
|
||||||
|
protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemstoreSize memstoreSize) {
|
||||||
|
super(segment); // initiailize the upper class
|
||||||
|
incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
|
||||||
|
int numOfCells = segment.getCellsCount();
|
||||||
|
// build the new CellSet based on CellChunkMap
|
||||||
|
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
|
||||||
|
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
|
||||||
|
// add sizes of CellChunkMap entry, decrease also Cell object sizes
|
||||||
|
// (reinitializeCellSet doesn't take the care for the sizes)
|
||||||
|
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
|
||||||
|
|
||||||
|
incSize(0, newSegmentSizeDelta);
|
||||||
|
memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected long indexEntrySize() {
|
||||||
|
return (ClassSize.CELL_CHUNK_MAP_ENTRY - KeyValue.FIXED_OVERHEAD);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean canBeFlattened() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////// PRIVATE METHODS /////////////////////
|
||||||
|
/*------------------------------------------------------------------------*/
|
||||||
|
// Create CellSet based on CellChunkMap from compacting iterator
|
||||||
|
private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
|
||||||
|
MemStoreCompactor.Action action) {
|
||||||
|
|
||||||
|
// calculate how many chunks we will need for index
|
||||||
|
int chunkSize = ChunkCreator.getInstance().getChunkSize();
|
||||||
|
int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
|
||||||
|
int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
|
||||||
|
int numOfCellsAfterCompaction = 0;
|
||||||
|
int currentChunkIdx = 0;
|
||||||
|
int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||||
|
// all index Chunks are allocated from ChunkCreator
|
||||||
|
Chunk[] chunks = new Chunk[numberOfChunks];
|
||||||
|
for (int i=0; i < numberOfChunks; i++) {
|
||||||
|
chunks[i] = this.getMemStoreLAB().getNewExternalChunk();
|
||||||
|
}
|
||||||
|
while (iterator.hasNext()) { // the iterator hides the elimination logic for compaction
|
||||||
|
Cell c = iterator.next();
|
||||||
|
numOfCellsAfterCompaction++;
|
||||||
|
assert (c instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
|
||||||
|
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) {
|
||||||
|
currentChunkIdx++; // continue to the next index chunk
|
||||||
|
offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||||
|
}
|
||||||
|
if (action == MemStoreCompactor.Action.COMPACT) {
|
||||||
|
c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy)
|
||||||
|
}
|
||||||
|
offsetInCurentChunk = // add the Cell reference to the index chunk
|
||||||
|
createCellReference((ByteBufferKeyValue)c, chunks[currentChunkIdx].getData(),
|
||||||
|
offsetInCurentChunk);
|
||||||
|
// the sizes still need to be updated in the new segment
|
||||||
|
// second parameter true, because in compaction/merge the addition of the cell to new segment
|
||||||
|
// is always successful
|
||||||
|
updateMetaInfo(c, true, null); // updates the size per cell
|
||||||
|
}
|
||||||
|
// build the immutable CellSet
|
||||||
|
CellChunkMap ccm =
|
||||||
|
new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCellsAfterCompaction,false);
|
||||||
|
this.setCellSet(null, new CellSet(ccm)); // update the CellSet of this Segment
|
||||||
|
}
|
||||||
|
|
||||||
|
/*------------------------------------------------------------------------*/
|
||||||
|
// Create CellSet based on CellChunkMap from current ConcurrentSkipListMap based CellSet
|
||||||
|
// (without compacting iterator)
|
||||||
|
// This is a service for not-flat immutable segments
|
||||||
|
// Assumption: cells do not exceed chunk size!
|
||||||
|
private void reinitializeCellSet(
|
||||||
|
int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
|
||||||
|
Cell curCell;
|
||||||
|
// calculate how many chunks we will need for metadata
|
||||||
|
int chunkSize = ChunkCreator.getInstance().getChunkSize();
|
||||||
|
int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
|
||||||
|
int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
|
||||||
|
// all index Chunks are allocated from ChunkCreator
|
||||||
|
Chunk[] chunks = new Chunk[numberOfChunks];
|
||||||
|
for (int i=0; i < numberOfChunks; i++) {
|
||||||
|
chunks[i] = this.getMemStoreLAB().getNewExternalChunk();
|
||||||
|
}
|
||||||
|
|
||||||
|
int currentChunkIdx = 0;
|
||||||
|
int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while ((curCell = segmentScanner.next()) != null) {
|
||||||
|
assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
|
||||||
|
if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkSize) {
|
||||||
|
// continue to the next metadata chunk
|
||||||
|
currentChunkIdx++;
|
||||||
|
offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||||
|
}
|
||||||
|
offsetInCurentChunk =
|
||||||
|
createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(),
|
||||||
|
offsetInCurentChunk);
|
||||||
|
}
|
||||||
|
} catch (IOException ie) {
|
||||||
|
throw new IllegalStateException(ie);
|
||||||
|
} finally {
|
||||||
|
segmentScanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
CellChunkMap ccm = new CellChunkMap(CellComparator.COMPARATOR,chunks,0,numOfCells,false);
|
||||||
|
this.setCellSet(oldCellSet, new CellSet(ccm)); // update the CellSet of this Segment
|
||||||
|
}
|
||||||
|
|
||||||
|
/*------------------------------------------------------------------------*/
|
||||||
|
// for a given cell, write the cell representation on the index chunk
|
||||||
|
private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) {
|
||||||
|
int offset = idxOffset;
|
||||||
|
int dataChunkID = cell.getChunkId();
|
||||||
|
|
||||||
|
offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID); // write data chunk id
|
||||||
|
offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset()); // offset
|
||||||
|
offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length
|
||||||
|
offset = ByteBufferUtils.putLong(idxBuffer, offset, cell.getSequenceId()); // seqId
|
||||||
|
|
||||||
|
return offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int calculateNumberOfChunks(int numOfCells, int numOfCellsInChunk) {
|
||||||
|
int numberOfChunks = numOfCells/numOfCellsInChunk;
|
||||||
|
if(numOfCells%numOfCellsInChunk!=0) { // if cells cannot be divided evenly between chunks
|
||||||
|
numberOfChunks++; // add one additional chunk
|
||||||
|
}
|
||||||
|
return numberOfChunks;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,566 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.*;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* compacted memstore test case
|
||||||
|
*/
|
||||||
|
@Category({RegionServerTests.class, MediumTests.class})
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Object[] data() {
|
||||||
|
return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes
|
||||||
|
}
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestCompactingToCellFlatMapMemStore.class);
|
||||||
|
public final boolean toCellChunkMap;
|
||||||
|
Configuration conf;
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Helpers
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
public TestCompactingToCellFlatMapMemStore(String type){
|
||||||
|
if (type == "CHUNK_MAP") {
|
||||||
|
toCellChunkMap = true;
|
||||||
|
} else {
|
||||||
|
toCellChunkMap = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void tearDown() throws Exception {
|
||||||
|
chunkCreator.clearChunksInPool();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void setUp() throws Exception {
|
||||||
|
|
||||||
|
compactingSetUp();
|
||||||
|
this.conf = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
// set memstore to do data compaction
|
||||||
|
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
|
||||||
|
String.valueOf(MemoryCompactionPolicy.EAGER));
|
||||||
|
|
||||||
|
this.memstore =
|
||||||
|
new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
|
||||||
|
regionServicesForStores, MemoryCompactionPolicy.EAGER);
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Compaction tests
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
public void testCompaction1Bucket() throws IOException {
|
||||||
|
int counter = 0;
|
||||||
|
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
|
||||||
|
if (toCellChunkMap) {
|
||||||
|
// set memstore to flat into CellChunkMap
|
||||||
|
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
|
||||||
|
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
|
||||||
|
((CompactingMemStore)memstore).setIndexType();
|
||||||
|
}
|
||||||
|
|
||||||
|
// test 1 bucket
|
||||||
|
long totalCellsLen = addRowsByKeys(memstore, keys1);
|
||||||
|
long cellBeforeFlushSize = cellBeforeFlushSize();
|
||||||
|
long cellAfterFlushSize = cellAfterFlushSize();
|
||||||
|
long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
|
||||||
|
|
||||||
|
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
|
||||||
|
|
||||||
|
assertEquals(4, memstore.getActive().getCellsCount());
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
|
||||||
|
// totalCellsLen
|
||||||
|
totalCellsLen = (totalCellsLen * 3) / 4;
|
||||||
|
assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
|
||||||
|
|
||||||
|
totalHeapSize =
|
||||||
|
3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
|
||||||
|
+ (toCellChunkMap ?
|
||||||
|
CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
|
||||||
|
CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
|
||||||
|
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
|
||||||
|
for ( Segment s : memstore.getSegments()) {
|
||||||
|
counter += s.getCellsCount();
|
||||||
|
}
|
||||||
|
assertEquals(3, counter);
|
||||||
|
MemstoreSize size = memstore.getFlushableSize();
|
||||||
|
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||||
|
region.decrMemstoreSize(size); // simulate flusher
|
||||||
|
ImmutableSegment s = memstore.getSnapshot();
|
||||||
|
assertEquals(3, s.getCellsCount());
|
||||||
|
assertEquals(0, regionServicesForStores.getMemstoreSize());
|
||||||
|
|
||||||
|
memstore.clearSnapshot(snapshot.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCompaction2Buckets() throws IOException {
|
||||||
|
if (toCellChunkMap) {
|
||||||
|
// set memstore to flat into CellChunkMap
|
||||||
|
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
|
||||||
|
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
|
||||||
|
((CompactingMemStore)memstore).setIndexType();
|
||||||
|
}
|
||||||
|
String[] keys1 = { "A", "A", "B", "C" };
|
||||||
|
String[] keys2 = { "A", "B", "D" };
|
||||||
|
|
||||||
|
long totalCellsLen1 = addRowsByKeys(memstore, keys1); // INSERT 4
|
||||||
|
long cellBeforeFlushSize = cellBeforeFlushSize();
|
||||||
|
long cellAfterFlushSize = cellAfterFlushSize();
|
||||||
|
long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
|
||||||
|
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||||
|
int counter = 0; // COMPACT 4->3
|
||||||
|
for ( Segment s : memstore.getSegments()) {
|
||||||
|
counter += s.getCellsCount();
|
||||||
|
}
|
||||||
|
assertEquals(3,counter);
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
|
||||||
|
// totalCellsLen
|
||||||
|
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
|
||||||
|
totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
|
||||||
|
+ (toCellChunkMap ?
|
||||||
|
CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
|
||||||
|
CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
|
||||||
|
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
long totalCellsLen2 = addRowsByKeys(memstore, keys2); // INSERT 3 (3+3=6)
|
||||||
|
long totalHeapSize2 = 3 * cellBeforeFlushSize;
|
||||||
|
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
|
||||||
|
counter = 0;
|
||||||
|
for ( Segment s : memstore.getSegments()) {
|
||||||
|
counter += s.getCellsCount();
|
||||||
|
}
|
||||||
|
assertEquals(4,counter);
|
||||||
|
totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
|
||||||
|
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
|
||||||
|
totalHeapSize2 = 1 * cellAfterFlushSize;
|
||||||
|
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
MemstoreSize size = memstore.getFlushableSize();
|
||||||
|
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||||
|
region.decrMemstoreSize(size); // simulate flusher
|
||||||
|
ImmutableSegment s = memstore.getSnapshot();
|
||||||
|
assertEquals(4, s.getCellsCount());
|
||||||
|
assertEquals(0, regionServicesForStores.getMemstoreSize());
|
||||||
|
|
||||||
|
memstore.clearSnapshot(snapshot.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCompaction3Buckets() throws IOException {
|
||||||
|
if (toCellChunkMap) {
|
||||||
|
// set memstore to flat into CellChunkMap
|
||||||
|
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
|
||||||
|
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
|
||||||
|
((CompactingMemStore)memstore).setIndexType();
|
||||||
|
}
|
||||||
|
String[] keys1 = { "A", "A", "B", "C" };
|
||||||
|
String[] keys2 = { "A", "B", "D" };
|
||||||
|
String[] keys3 = { "D", "B", "B" };
|
||||||
|
|
||||||
|
long totalCellsLen1 = addRowsByKeys(memstore, keys1);
|
||||||
|
long cellBeforeFlushSize = cellBeforeFlushSize();
|
||||||
|
long cellAfterFlushSize = cellAfterFlushSize();
|
||||||
|
long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
|
||||||
|
assertEquals(totalCellsLen1, region.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
MemstoreSize size = memstore.getFlushableSize();
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||||
|
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
// One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
|
||||||
|
// totalCellsLen
|
||||||
|
totalCellsLen1 = (totalCellsLen1 * 3) / 4;
|
||||||
|
totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
|
||||||
|
+ (toCellChunkMap ?
|
||||||
|
CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
|
||||||
|
CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
|
||||||
|
assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
long totalCellsLen2 = addRowsByKeys(memstore, keys2);
|
||||||
|
long totalHeapSize2 = 3 * cellBeforeFlushSize;
|
||||||
|
|
||||||
|
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).disableCompaction();
|
||||||
|
size = memstore.getFlushableSize();
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
|
||||||
|
totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
long totalCellsLen3 = addRowsByKeys(memstore, keys3);
|
||||||
|
long totalHeapSize3 = 3 * cellBeforeFlushSize;
|
||||||
|
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
|
||||||
|
regionServicesForStores.getMemstoreSize());
|
||||||
|
assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
|
||||||
|
((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).enableCompaction();
|
||||||
|
size = memstore.getFlushableSize();
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||||
|
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
|
||||||
|
Threads.sleep(10);
|
||||||
|
}
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
// active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
|
||||||
|
// Out of total 10, only 4 cells are unique
|
||||||
|
totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
|
||||||
|
totalCellsLen3 = 0;// All duplicated cells.
|
||||||
|
assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
|
||||||
|
regionServicesForStores.getMemstoreSize());
|
||||||
|
// Only 4 unique cells left
|
||||||
|
long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
|
||||||
|
+ (toCellChunkMap ?
|
||||||
|
CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
|
||||||
|
CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
|
||||||
|
assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
|
||||||
|
|
||||||
|
size = memstore.getFlushableSize();
|
||||||
|
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||||
|
region.decrMemstoreSize(size); // simulate flusher
|
||||||
|
ImmutableSegment s = memstore.getSnapshot();
|
||||||
|
assertEquals(4, s.getCellsCount());
|
||||||
|
assertEquals(0, regionServicesForStores.getMemstoreSize());
|
||||||
|
|
||||||
|
memstore.clearSnapshot(snapshot.getId());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Merging tests
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
@Test
|
||||||
|
public void testMerging() throws IOException {
|
||||||
|
if (toCellChunkMap) {
|
||||||
|
// set memstore to flat into CellChunkMap
|
||||||
|
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
|
||||||
|
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
|
||||||
|
((CompactingMemStore)memstore).setIndexType();
|
||||||
|
}
|
||||||
|
String[] keys1 = { "A", "A", "B", "C", "F", "H"};
|
||||||
|
String[] keys2 = { "A", "B", "D", "G", "I", "J"};
|
||||||
|
String[] keys3 = { "D", "B", "B", "E" };
|
||||||
|
|
||||||
|
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
|
||||||
|
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
|
||||||
|
String.valueOf(compactionType));
|
||||||
|
((CompactingMemStore)memstore).initiateType(compactionType);
|
||||||
|
addRowsByKeys(memstore, keys1);
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
|
||||||
|
|
||||||
|
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
|
||||||
|
Threads.sleep(10);
|
||||||
|
}
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
|
||||||
|
addRowsByKeys(memstore, keys2); // also should only flatten
|
||||||
|
|
||||||
|
int counter2 = 0;
|
||||||
|
for ( Segment s : memstore.getSegments()) {
|
||||||
|
counter2 += s.getCellsCount();
|
||||||
|
}
|
||||||
|
assertEquals(12, counter2);
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).disableCompaction();
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
|
||||||
|
int counter3 = 0;
|
||||||
|
for ( Segment s : memstore.getSegments()) {
|
||||||
|
counter3 += s.getCellsCount();
|
||||||
|
}
|
||||||
|
assertEquals(12, counter3);
|
||||||
|
|
||||||
|
addRowsByKeys(memstore, keys3);
|
||||||
|
|
||||||
|
int counter4 = 0;
|
||||||
|
for ( Segment s : memstore.getSegments()) {
|
||||||
|
counter4 += s.getCellsCount();
|
||||||
|
}
|
||||||
|
assertEquals(16, counter4);
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).enableCompaction();
|
||||||
|
|
||||||
|
|
||||||
|
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
|
||||||
|
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
|
||||||
|
Threads.sleep(10);
|
||||||
|
}
|
||||||
|
assertEquals(0, memstore.getSnapshot().getCellsCount());
|
||||||
|
|
||||||
|
int counter = 0;
|
||||||
|
for ( Segment s : memstore.getSegments()) {
|
||||||
|
counter += s.getCellsCount();
|
||||||
|
}
|
||||||
|
assertEquals(16,counter);
|
||||||
|
|
||||||
|
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
|
||||||
|
ImmutableSegment s = memstore.getSnapshot();
|
||||||
|
memstore.clearSnapshot(snapshot.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCountOfCellsAfterFlatteningByScan() throws IOException {
|
||||||
|
String[] keys1 = { "A", "B", "C" }; // A, B, C
|
||||||
|
addRowsByKeysWith50Cols(memstore, keys1);
|
||||||
|
// this should only flatten as there are no duplicates
|
||||||
|
((CompactingMemStore) memstore).flushInMemory();
|
||||||
|
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
|
||||||
|
Threads.sleep(10);
|
||||||
|
}
|
||||||
|
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
|
||||||
|
// seek
|
||||||
|
int count = 0;
|
||||||
|
for(int i = 0; i < scanners.size(); i++) {
|
||||||
|
scanners.get(i).seek(KeyValue.LOWESTKEY);
|
||||||
|
while (scanners.get(i).next() != null) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals("the count should be ", count, 150);
|
||||||
|
for(int i = 0; i < scanners.size(); i++) {
|
||||||
|
scanners.get(i).close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
|
||||||
|
String[] keys1 = { "A", "B", "C" }; // A, B, C
|
||||||
|
addRowsByKeysWith50Cols(memstore, keys1);
|
||||||
|
// this should only flatten as there are no duplicates
|
||||||
|
((CompactingMemStore) memstore).flushInMemory();
|
||||||
|
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
|
||||||
|
Threads.sleep(10);
|
||||||
|
}
|
||||||
|
// Just doing the cnt operation here
|
||||||
|
MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
|
||||||
|
((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
|
||||||
|
CellComparator.COMPARATOR, 10);
|
||||||
|
int cnt = 0;
|
||||||
|
try {
|
||||||
|
while (itr.next() != null) {
|
||||||
|
cnt++;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
itr.close();
|
||||||
|
}
|
||||||
|
assertEquals("the count should be ", cnt, 150);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
|
||||||
|
byte[] fam = Bytes.toBytes("testfamily");
|
||||||
|
for (int i = 0; i < keys.length; i++) {
|
||||||
|
long timestamp = System.currentTimeMillis();
|
||||||
|
Threads.sleep(1); // to make sure each kv gets a different ts
|
||||||
|
byte[] row = Bytes.toBytes(keys[i]);
|
||||||
|
for(int j =0 ;j < 50; j++) {
|
||||||
|
byte[] qf = Bytes.toBytes("testqualifier"+j);
|
||||||
|
byte[] val = Bytes.toBytes(keys[i] + j);
|
||||||
|
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
|
||||||
|
hmc.add(kv, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Test
|
||||||
|
public void testPuttingBackChunksWithOpeningScanner() throws IOException {
|
||||||
|
byte[] row = Bytes.toBytes("testrow");
|
||||||
|
byte[] fam = Bytes.toBytes("testfamily");
|
||||||
|
byte[] qf1 = Bytes.toBytes("testqualifier1");
|
||||||
|
byte[] qf2 = Bytes.toBytes("testqualifier2");
|
||||||
|
byte[] qf3 = Bytes.toBytes("testqualifier3");
|
||||||
|
byte[] qf4 = Bytes.toBytes("testqualifier4");
|
||||||
|
byte[] qf5 = Bytes.toBytes("testqualifier5");
|
||||||
|
byte[] qf6 = Bytes.toBytes("testqualifier6");
|
||||||
|
byte[] qf7 = Bytes.toBytes("testqualifier7");
|
||||||
|
byte[] val = Bytes.toBytes("testval");
|
||||||
|
|
||||||
|
// Setting up memstore
|
||||||
|
memstore.add(new KeyValue(row, fam, qf1, val), null);
|
||||||
|
memstore.add(new KeyValue(row, fam, qf2, val), null);
|
||||||
|
memstore.add(new KeyValue(row, fam, qf3, val), null);
|
||||||
|
|
||||||
|
// Creating a snapshot
|
||||||
|
MemStoreSnapshot snapshot = memstore.snapshot();
|
||||||
|
assertEquals(3, memstore.getSnapshot().getCellsCount());
|
||||||
|
|
||||||
|
// Adding value to "new" memstore
|
||||||
|
assertEquals(0, memstore.getActive().getCellsCount());
|
||||||
|
memstore.add(new KeyValue(row, fam, qf4, val), null);
|
||||||
|
memstore.add(new KeyValue(row, fam, qf5, val), null);
|
||||||
|
assertEquals(2, memstore.getActive().getCellsCount());
|
||||||
|
|
||||||
|
// opening scanner before clear the snapshot
|
||||||
|
List<KeyValueScanner> scanners = memstore.getScanners(0);
|
||||||
|
// Shouldn't putting back the chunks to pool,since some scanners are opening
|
||||||
|
// based on their data
|
||||||
|
// close the scanners
|
||||||
|
for(KeyValueScanner scanner : snapshot.getScanners()) {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
memstore.clearSnapshot(snapshot.getId());
|
||||||
|
|
||||||
|
assertTrue(chunkCreator.getPoolSize() == 0);
|
||||||
|
|
||||||
|
// Chunks will be put back to pool after close scanners;
|
||||||
|
for (KeyValueScanner scanner : scanners) {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
assertTrue(chunkCreator.getPoolSize() > 0);
|
||||||
|
|
||||||
|
// clear chunks
|
||||||
|
chunkCreator.clearChunksInPool();
|
||||||
|
|
||||||
|
// Creating another snapshot
|
||||||
|
|
||||||
|
snapshot = memstore.snapshot();
|
||||||
|
// Adding more value
|
||||||
|
memstore.add(new KeyValue(row, fam, qf6, val), null);
|
||||||
|
memstore.add(new KeyValue(row, fam, qf7, val), null);
|
||||||
|
// opening scanners
|
||||||
|
scanners = memstore.getScanners(0);
|
||||||
|
// close scanners before clear the snapshot
|
||||||
|
for (KeyValueScanner scanner : scanners) {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
// Since no opening scanner, the chunks of snapshot should be put back to
|
||||||
|
// pool
|
||||||
|
// close the scanners
|
||||||
|
for(KeyValueScanner scanner : snapshot.getScanners()) {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
memstore.clearSnapshot(snapshot.getId());
|
||||||
|
assertTrue(chunkCreator.getPoolSize() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPuttingBackChunksAfterFlushing() throws IOException {
|
||||||
|
byte[] row = Bytes.toBytes("testrow");
|
||||||
|
byte[] fam = Bytes.toBytes("testfamily");
|
||||||
|
byte[] qf1 = Bytes.toBytes("testqualifier1");
|
||||||
|
byte[] qf2 = Bytes.toBytes("testqualifier2");
|
||||||
|
byte[] qf3 = Bytes.toBytes("testqualifier3");
|
||||||
|
byte[] qf4 = Bytes.toBytes("testqualifier4");
|
||||||
|
byte[] qf5 = Bytes.toBytes("testqualifier5");
|
||||||
|
byte[] val = Bytes.toBytes("testval");
|
||||||
|
|
||||||
|
// Setting up memstore
|
||||||
|
memstore.add(new KeyValue(row, fam, qf1, val), null);
|
||||||
|
memstore.add(new KeyValue(row, fam, qf2, val), null);
|
||||||
|
memstore.add(new KeyValue(row, fam, qf3, val), null);
|
||||||
|
|
||||||
|
// Creating a snapshot
|
||||||
|
MemStoreSnapshot snapshot = memstore.snapshot();
|
||||||
|
assertEquals(3, memstore.getSnapshot().getCellsCount());
|
||||||
|
|
||||||
|
// Adding value to "new" memstore
|
||||||
|
assertEquals(0, memstore.getActive().getCellsCount());
|
||||||
|
memstore.add(new KeyValue(row, fam, qf4, val), null);
|
||||||
|
memstore.add(new KeyValue(row, fam, qf5, val), null);
|
||||||
|
assertEquals(2, memstore.getActive().getCellsCount());
|
||||||
|
// close the scanners
|
||||||
|
for(KeyValueScanner scanner : snapshot.getScanners()) {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
memstore.clearSnapshot(snapshot.getId());
|
||||||
|
|
||||||
|
int chunkCount = chunkCreator.getPoolSize();
|
||||||
|
assertTrue(chunkCount > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
|
||||||
|
byte[] fam = Bytes.toBytes("testfamily");
|
||||||
|
byte[] qf = Bytes.toBytes("testqualifier");
|
||||||
|
MemstoreSize memstoreSize = new MemstoreSize();
|
||||||
|
for (int i = 0; i < keys.length; i++) {
|
||||||
|
long timestamp = System.currentTimeMillis();
|
||||||
|
Threads.sleep(1); // to make sure each kv gets a different ts
|
||||||
|
byte[] row = Bytes.toBytes(keys[i]);
|
||||||
|
byte[] val = Bytes.toBytes(keys[i] + i);
|
||||||
|
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
|
||||||
|
hmc.add(kv, memstoreSize);
|
||||||
|
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
|
||||||
|
}
|
||||||
|
regionServicesForStores.addMemstoreSize(memstoreSize);
|
||||||
|
return memstoreSize.getDataSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
private long cellBeforeFlushSize() {
|
||||||
|
// make one cell
|
||||||
|
byte[] row = Bytes.toBytes("A");
|
||||||
|
byte[] val = Bytes.toBytes("A" + 0);
|
||||||
|
KeyValue kv =
|
||||||
|
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
|
||||||
|
System.currentTimeMillis(), val);
|
||||||
|
return ClassSize.align(
|
||||||
|
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv));
|
||||||
|
}
|
||||||
|
|
||||||
|
private long cellAfterFlushSize() {
|
||||||
|
// make one cell
|
||||||
|
byte[] row = Bytes.toBytes("A");
|
||||||
|
byte[] val = Bytes.toBytes("A" + 0);
|
||||||
|
KeyValue kv =
|
||||||
|
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
|
||||||
|
System.currentTimeMillis(), val);
|
||||||
|
|
||||||
|
return toCellChunkMap ?
|
||||||
|
ClassSize.align(
|
||||||
|
ClassSize.CELL_CHUNK_MAP_ENTRY + KeyValueUtil.length(kv)) :
|
||||||
|
ClassSize.align(
|
||||||
|
ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue