HBASE-16417: In-memory MemStore Policy for Flattening and Compactions

This commit is contained in:
eshcar 2017-11-01 16:48:40 +02:00
parent 426ab1e931
commit 526d2826f5
23 changed files with 790 additions and 278 deletions

View File

@ -42,5 +42,11 @@ public enum MemoryCompactionPolicy {
* on-disk compaction does after the data is flushed to disk). This policy is most useful for * on-disk compaction does after the data is flushed to disk). This policy is most useful for
* applications with high data churn or small working sets. * applications with high data churn or small working sets.
*/ */
EAGER EAGER,
/**
* Adaptive compaction adapts to the workload. It applies either index compaction or data
* compaction based on the ratio of duplicate cells in the data.
*/
ADAPTIVE
} }

View File

@ -1612,6 +1612,35 @@ public final class CellUtil {
return matchingColumn(left, right); return matchingColumn(left, right);
} }
public static boolean matchingRowColumnBytes(final Cell left, final Cell right) {
int lrowlength = left.getRowLength();
int rrowlength = right.getRowLength();
int lfamlength = left.getFamilyLength();
int rfamlength = right.getFamilyLength();
int lqlength = left.getQualifierLength();
int rqlength = right.getQualifierLength();
// match length
if ((lrowlength + lfamlength + lqlength) !=
(rrowlength + rfamlength + rqlength)) {
return false;
}
// match row
if (!Bytes.equals(left.getRowArray(), left.getRowOffset(), lrowlength, right.getRowArray(),
right.getRowOffset(), rrowlength)) {
return false;
}
//match family
if (!Bytes.equals(left.getFamilyArray(), left.getFamilyOffset(), lfamlength,
right.getFamilyArray(), right.getFamilyOffset(), rfamlength)) {
return false;
}
//match qualifier
return Bytes.equals(left.getQualifierArray(), left.getQualifierOffset(),
lqlength, right.getQualifierArray(), right.getQualifierOffset(),
rqlength);
}
/** /**
* Compares the cell's qualifier with the given byte[] * Compares the cell's qualifier with the given byte[]
* @param left the cell for which the qualifier has to be compared * @param left the cell for which the qualifier has to be compared

View File

@ -330,9 +330,10 @@ public class ClassSize {
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN); TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE); SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
NON_SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG); NON_SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG);
CELL_SET = align(OBJECT + REFERENCE); CELL_SET = align(OBJECT + REFERENCE + Bytes.SIZEOF_INT);
STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);
} }

View File

@ -0,0 +1,112 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Adaptive is a heuristic that chooses whether to apply data compaction or not based on the
* level of redundancy in the data. Adaptive triggers redundancy elimination only for those
* stores where positive impact is expected.
*
* Adaptive uses two parameters to determine whether to perform redundancy elimination.
* The first parameter, u, estimates the ratio of unique keys in the memory store based on the
* fraction of unique keys encountered during the previous merge of segment indices.
* The second is the perceived probability (compactionProbability) that the store can benefit from
* redundancy elimination. Initially, compactionProbability=0.5; it then grows exponentially by
* 2% whenever a compaction is successful and decreased by 2% whenever a compaction did not meet
* the expectation. It is reset back to the default value (namely 0.5) upon disk flush.
*
* Adaptive triggers redundancy elimination with probability compactionProbability if the
* fraction of redundant keys 1-u exceeds a parameter threshold compactionThreshold.
*/
@InterfaceAudience.Private
public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
private static final String name = "ADAPTIVE";
public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY =
"hbase.hregion.compacting.memstore.adaptive.compaction.threshold";
private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5;
public static final String ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY =
"hbase.hregion.compacting.memstore.adaptive.compaction.probability";
private static final double ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT = 0.5;
private static final double ADAPTIVE_PROBABILITY_FACTOR = 1.02;
private double compactionThreshold;
private double initialCompactionProbability;
private double compactionProbability;
private Random rand = new Random();
private double numCellsInVersionedList = 0;
private boolean compacted = false;
public AdaptiveMemStoreCompactionStrategy(Configuration conf, String cfName) {
super(conf, cfName);
compactionThreshold = conf.getDouble(ADAPTIVE_COMPACTION_THRESHOLD_KEY,
ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT);
initialCompactionProbability = conf.getDouble(ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY,
ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT);
resetStats();
}
@Override public Action getAction(VersionedSegmentsList versionedList) {
if (versionedList.getEstimatedUniquesFrac() < 1.0 - compactionThreshold) {
double r = rand.nextDouble();
if(r < compactionProbability) {
numCellsInVersionedList = versionedList.getNumOfCells();
compacted = true;
return compact(versionedList, name+" (compaction probability="+compactionProbability+")");
}
}
compacted = false;
return simpleMergeOrFlatten(versionedList,
name+" (compaction probability="+compactionProbability+")");
}
@Override
public void updateStats(Segment replacement) {
if(compacted) {
if (replacement.getCellsCount() / numCellsInVersionedList < 1.0 - compactionThreshold) {
// compaction was a good decision - increase probability
compactionProbability *= ADAPTIVE_PROBABILITY_FACTOR;
if(compactionProbability > 1.0) {
compactionProbability = 1.0;
}
} else {
// compaction was NOT a good decision - decrease probability
compactionProbability /= ADAPTIVE_PROBABILITY_FACTOR;
}
}
}
@Override
public void resetStats() {
compactionProbability = initialCompactionProbability;
}
protected Action getMergingAction() {
return Action.MERGE_COUNT_UNIQUE_KEYS;
}
protected Action getFlattenAction() {
return Action.FLATTEN;
}
}

View File

@ -0,0 +1,43 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Basic strategy chooses between two actions: flattening a segment or merging indices of all
* segments in the pipeline.
* If number of segments in pipeline exceed the limit defined in MemStoreCompactionStrategy then
* apply merge, otherwise flatten some segment.
*/
@InterfaceAudience.Private
public class BasicMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
private static final String name = "BASIC";
public BasicMemStoreCompactionStrategy(Configuration conf, String cfName) {
super(conf, cfName);
}
@Override
public Action getAction(VersionedSegmentsList versionedList) {
return simpleMergeOrFlatten(versionedList, name);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -42,7 +43,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
incSize(0, DEEP_OVERHEAD_CAM); incSize(0, DEEP_OVERHEAD_CAM);
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
@ -54,12 +55,14 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
* of CSLMImmutableSegment * of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) { protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing,
MemStoreCompactionStrategy.Action action) {
super(segment); // initiailize the upper class super(segment); // initiailize the upper class
incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM); incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment // build the new CellSet based on CellChunkMap and update the CellSet of this Segment
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet()); reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
action);
// arrange the meta-data size, decrease all meta-data sizes related to SkipList; // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
@ -81,14 +84,18 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from compacting iterator // Create CellSet based on CellArrayMap from compacting iterator
private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator, private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
MemStoreCompactor.Action action) { MemStoreCompactionStrategy.Action action) {
boolean merge = (action == MemStoreCompactionStrategy.Action.MERGE ||
action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
Cell[] cells = new Cell[numOfCells]; // build the Cell Array Cell[] cells = new Cell[numOfCells]; // build the Cell Array
int i = 0; int i = 0;
int numUniqueKeys=0;
Cell prev = null;
while (iterator.hasNext()) { while (iterator.hasNext()) {
Cell c = iterator.next(); Cell c = iterator.next();
// The scanner behind the iterator is doing all the elimination logic // The scanner behind the iterator is doing all the elimination logic
if (action == MemStoreCompactor.Action.MERGE) { if (merge) {
// if this is merge we just move the Cell object without copying MSLAB // if this is merge we just move the Cell object without copying MSLAB
// the sizes still need to be updated in the new segment // the sizes still need to be updated in the new segment
cells[i] = c; cells[i] = c;
@ -99,11 +106,27 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// second parameter true, because in compaction/merge the addition of the cell to new segment // second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful // is always successful
updateMetaInfo(c, true, null); // updates the size per cell updateMetaInfo(c, true, null); // updates the size per cell
if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
//counting number of unique keys
if (prev != null) {
if (!CellUtil.matchingRowColumnBytes(prev, c)) {
numUniqueKeys++;
}
} else {
numUniqueKeys++;
}
}
prev = c;
i++; i++;
} }
if(action == MemStoreCompactionStrategy.Action.COMPACT) {
numUniqueKeys = numOfCells;
} else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
}
// build the immutable CellSet // build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false); CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
this.setCellSet(null, new CellSet(cam)); // update the CellSet of this Segment this.setCellSet(null, new CellSet(cam, numUniqueKeys)); // update the CellSet of this Segment
} }
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
@ -111,22 +134,40 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// (without compacting iterator) // (without compacting iterator)
// We do not consider cells bigger than chunks! // We do not consider cells bigger than chunks!
private void reinitializeCellSet( private void reinitializeCellSet(
int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) { int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
MemStoreCompactionStrategy.Action action) {
Cell[] cells = new Cell[numOfCells]; // build the Cell Array Cell[] cells = new Cell[numOfCells]; // build the Cell Array
Cell curCell; Cell curCell;
int idx = 0; int idx = 0;
int numUniqueKeys=0;
Cell prev = null;
try { try {
while ((curCell = segmentScanner.next()) != null) { while ((curCell = segmentScanner.next()) != null) {
cells[idx++] = curCell; cells[idx++] = curCell;
if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
//counting number of unique keys
if (prev != null) {
if (!CellUtil.matchingRowColumn(prev, curCell)) {
numUniqueKeys++;
}
} else {
numUniqueKeys++;
}
}
prev = curCell;
} }
} catch (IOException ie) { } catch (IOException ie) {
throw new IllegalStateException(ie); throw new IllegalStateException(ie);
} finally { } finally {
segmentScanner.close(); segmentScanner.close();
} }
if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
}
// build the immutable CellSet // build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false); CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
this.setCellSet(oldCellSet, new CellSet(cam)); // update the CellSet of this Segment // update the CellSet of this Segment
this.setCellSet(oldCellSet, new CellSet(cam, numUniqueKeys));
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.ByteBufferKeyValue; import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -48,7 +49,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
super(null, comparator, memStoreLAB); // initialize the CellSet with NULL super(null, comparator, memStoreLAB); // initialize the CellSet with NULL
incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
@ -61,12 +62,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellChunkImmutableSegment(CSLMImmutableSegment segment, protected CellChunkImmutableSegment(CSLMImmutableSegment segment,
MemStoreSizing memstoreSizing) { MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
super(segment); // initiailize the upper class super(segment); // initiailize the upper class
incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap // build the new CellSet based on CellChunkMap
reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet()); reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
action);
// arrange the meta-data size, decrease all meta-data sizes related to SkipList; // arrange the meta-data size, decrease all meta-data sizes related to SkipList;
// add sizes of CellChunkMap entry, decrease also Cell object sizes // add sizes of CellChunkMap entry, decrease also Cell object sizes
// (reinitializeCellSet doesn't take the care for the sizes) // (reinitializeCellSet doesn't take the care for the sizes)
@ -90,15 +92,17 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
// Create CellSet based on CellChunkMap from compacting iterator // Create CellSet based on CellChunkMap from compacting iterator
private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator, private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
MemStoreCompactor.Action action) { MemStoreCompactionStrategy.Action action) {
// calculate how many chunks we will need for index // calculate how many chunks we will need for index
int chunkSize = ChunkCreator.getInstance().getChunkSize(); int chunkSize = ChunkCreator.getInstance().getChunkSize();
int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk); int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk);
int numOfCellsAfterCompaction = 0; int numOfCellsAfterCompaction = 0;
int currentChunkIdx = 0; int currentChunkIdx = 0;
int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
int numUniqueKeys=0;
Cell prev = null;
// all index Chunks are allocated from ChunkCreator // all index Chunks are allocated from ChunkCreator
Chunk[] chunks = new Chunk[numberOfChunks]; Chunk[] chunks = new Chunk[numberOfChunks];
for (int i=0; i < numberOfChunks; i++) { for (int i=0; i < numberOfChunks; i++) {
@ -112,7 +116,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
currentChunkIdx++; // continue to the next index chunk currentChunkIdx++; // continue to the next index chunk
offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
} }
if (action == MemStoreCompactor.Action.COMPACT) { if (action == MemStoreCompactionStrategy.Action.COMPACT) {
c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy) c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy)
} }
offsetInCurentChunk = // add the Cell reference to the index chunk offsetInCurentChunk = // add the Cell reference to the index chunk
@ -122,11 +126,27 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// second parameter true, because in compaction/merge the addition of the cell to new segment // second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful // is always successful
updateMetaInfo(c, true, null); // updates the size per cell updateMetaInfo(c, true, null); // updates the size per cell
if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
//counting number of unique keys
if (prev != null) {
if (!CellUtil.matchingRowColumnBytes(prev, c)) {
numUniqueKeys++;
}
} else {
numUniqueKeys++;
}
}
prev = c;
}
if(action == MemStoreCompactionStrategy.Action.COMPACT) {
numUniqueKeys = numOfCells;
} else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
} }
// build the immutable CellSet // build the immutable CellSet
CellChunkMap ccm = CellChunkMap ccm =
new CellChunkMap(getComparator(), chunks, 0, numOfCellsAfterCompaction, false); new CellChunkMap(getComparator(), chunks, 0, numOfCellsAfterCompaction, false);
this.setCellSet(null, new CellSet(ccm)); // update the CellSet of this Segment this.setCellSet(null, new CellSet(ccm, numUniqueKeys)); // update the CellSet of this Segment
} }
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
@ -135,12 +155,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// This is a service for not-flat immutable segments // This is a service for not-flat immutable segments
// Assumption: cells do not exceed chunk size! // Assumption: cells do not exceed chunk size!
private void reinitializeCellSet( private void reinitializeCellSet(
int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) { int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
MemStoreCompactionStrategy.Action action) {
Cell curCell; Cell curCell;
// calculate how many chunks we will need for metadata // calculate how many chunks we will need for metadata
int chunkSize = ChunkCreator.getInstance().getChunkSize(); int chunkSize = ChunkCreator.getInstance().getChunkSize();
int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk); int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk);
// all index Chunks are allocated from ChunkCreator // all index Chunks are allocated from ChunkCreator
Chunk[] chunks = new Chunk[numberOfChunks]; Chunk[] chunks = new Chunk[numberOfChunks];
for (int i=0; i < numberOfChunks; i++) { for (int i=0; i < numberOfChunks; i++) {
@ -150,6 +171,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
int currentChunkIdx = 0; int currentChunkIdx = 0;
int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
int numUniqueKeys=0;
Cell prev = null;
try { try {
while ((curCell = segmentScanner.next()) != null) { while ((curCell = segmentScanner.next()) != null) {
assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
@ -161,6 +184,20 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
offsetInCurentChunk = offsetInCurentChunk =
createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(), createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(),
offsetInCurentChunk); offsetInCurentChunk);
if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
//counting number of unique keys
if (prev != null) {
if (!CellUtil.matchingRowColumn(prev, curCell)) {
numUniqueKeys++;
}
} else {
numUniqueKeys++;
}
}
prev = curCell;
}
if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
} }
} catch (IOException ie) { } catch (IOException ie) {
throw new IllegalStateException(ie); throw new IllegalStateException(ie);
@ -169,7 +206,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
} }
CellChunkMap ccm = new CellChunkMap(getComparator(), chunks, 0, numOfCells, false); CellChunkMap ccm = new CellChunkMap(getComparator(), chunks, 0, numOfCells, false);
this.setCellSet(oldCellSet, new CellSet(ccm)); // update the CellSet of this Segment // update the CellSet of this Segment
this.setCellSet(oldCellSet, new CellSet(ccm, numUniqueKeys));
} }
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/

View File

@ -41,6 +41,8 @@ import org.apache.yetus.audience.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class CellSet implements NavigableSet<Cell> { public class CellSet implements NavigableSet<Cell> {
public static final int UNKNOWN_NUM_UNIQUES = -1;
// Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap} // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap}
// Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it
// is not already present.", this implementation "Adds the specified element to this set EVEN // is not already present.", this implementation "Adds the specified element to this set EVEN
@ -48,12 +50,22 @@ public class CellSet implements NavigableSet<Cell> {
// Otherwise, has same attributes as ConcurrentSkipListSet // Otherwise, has same attributes as ConcurrentSkipListSet
private final NavigableMap<Cell, Cell> delegatee; /// private final NavigableMap<Cell, Cell> delegatee; ///
private final int numUniqueKeys;
CellSet(final CellComparator c) { CellSet(final CellComparator c) {
this.delegatee = new ConcurrentSkipListMap<>(c); this.delegatee = new ConcurrentSkipListMap<>(c);
this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
} }
CellSet(final NavigableMap<Cell, Cell> m, int numUniqueKeys) {
this.delegatee = m;
this.numUniqueKeys = numUniqueKeys;
}
@VisibleForTesting
CellSet(final NavigableMap<Cell, Cell> m) { CellSet(final NavigableMap<Cell, Cell> m) {
this.delegatee = m; this.delegatee = m;
this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
} }
@VisibleForTesting @VisibleForTesting
@ -83,7 +95,7 @@ public class CellSet implements NavigableSet<Cell> {
public NavigableSet<Cell> headSet(final Cell toElement, public NavigableSet<Cell> headSet(final Cell toElement,
boolean inclusive) { boolean inclusive) {
return new CellSet(this.delegatee.headMap(toElement, inclusive)); return new CellSet(this.delegatee.headMap(toElement, inclusive), UNKNOWN_NUM_UNIQUES);
} }
public Cell higher(Cell e) { public Cell higher(Cell e) {
@ -120,7 +132,7 @@ public class CellSet implements NavigableSet<Cell> {
} }
public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) { public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
return new CellSet(this.delegatee.tailMap(fromElement, inclusive)); return new CellSet(this.delegatee.tailMap(fromElement, inclusive), UNKNOWN_NUM_UNIQUES);
} }
public Comparator<? super Cell> comparator() { public Comparator<? super Cell> comparator() {
@ -187,4 +199,8 @@ public class CellSet implements NavigableSet<Cell> {
public <T> T[] toArray(T[] a) { public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED); throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
} }
public int getNumUniqueKeys() {
return numUniqueKeys;
}
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -66,13 +67,13 @@ public class CompactingMemStore extends AbstractMemStore {
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor"; "hbase.memstore.inmemoryflush.threshold.factor";
private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.25; private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.02;
private static final Log LOG = LogFactory.getLog(CompactingMemStore.class); private static final Log LOG = LogFactory.getLog(CompactingMemStore.class);
private HStore store; private HStore store;
private RegionServicesForStores regionServices; private RegionServicesForStores regionServices;
private CompactionPipeline pipeline; private CompactionPipeline pipeline;
private MemStoreCompactor compactor; protected MemStoreCompactor compactor;
private long inmemoryFlushSize; // the threshold on active size for in-memory flush private long inmemoryFlushSize; // the threshold on active size for in-memory flush
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
@ -81,7 +82,7 @@ public class CompactingMemStore extends AbstractMemStore {
private boolean inWalReplay = false; private boolean inWalReplay = false;
@VisibleForTesting @VisibleForTesting
private final AtomicBoolean allowCompaction = new AtomicBoolean(true); protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true; private boolean compositeSnapshot = true;
/** /**
@ -119,7 +120,8 @@ public class CompactingMemStore extends AbstractMemStore {
} }
@VisibleForTesting @VisibleForTesting
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) { protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
throws IllegalArgumentIOException {
return new MemStoreCompactor(this, compactionPolicy); return new MemStoreCompactor(this, compactionPolicy);
} }
@ -205,6 +207,7 @@ public class CompactingMemStore extends AbstractMemStore {
} else { } else {
pushTailToSnapshot(); pushTailToSnapshot();
} }
compactor.resetStats();
} }
return new MemStoreSnapshot(snapshotId, this.snapshot); return new MemStoreSnapshot(snapshotId, this.snapshot);
} }
@ -298,10 +301,6 @@ public class CompactingMemStore extends AbstractMemStore {
this.compositeSnapshot = useCompositeSnapshot; this.compositeSnapshot = useCompositeSnapshot;
} }
public boolean isCompositeSnapshot() {
return this.compositeSnapshot;
}
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) { boolean merge) {
// last true stands for updating the region size // last true stands for updating the region size
@ -313,8 +312,8 @@ public class CompactingMemStore extends AbstractMemStore {
* with version taken earlier. This version must be passed as a parameter here. * with version taken earlier. This version must be passed as a parameter here.
* The flattening happens only if versions match. * The flattening happens only if versions match.
*/ */
public void flattenOneSegment(long requesterVersion) { public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) {
pipeline.flattenOneSegment(requesterVersion, indexType); pipeline.flattenOneSegment(requesterVersion, indexType, action);
} }
// setter is used only for testability // setter is used only for testability
@ -543,29 +542,11 @@ public class CompactingMemStore extends AbstractMemStore {
} }
} }
//----------------------------------------------------------------------
//methods for tests
//----------------------------------------------------------------------
@VisibleForTesting @VisibleForTesting
boolean isMemStoreFlushingInMemory() { boolean isMemStoreFlushingInMemory() {
return inMemoryFlushInProgress.get(); return inMemoryFlushInProgress.get();
} }
@VisibleForTesting
void disableCompaction() {
allowCompaction.set(false);
}
@VisibleForTesting
void enableCompaction() {
allowCompaction.set(true);
}
@VisibleForTesting
void initiateType(MemoryCompactionPolicy compactionType) {
compactor.initiateAction(compactionType);
}
/** /**
* @param cell Find the row that comes after this one. If null, we return the * @param cell Find the row that comes after this one. If null, we return the
* first. * first.

View File

@ -126,14 +126,10 @@ public class CompactionPipeline {
} }
suffix = versionedList.getStoreSegments(); suffix = versionedList.getStoreSegments();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
int count = 0;
if(segment != null) {
count = segment.getCellsCount();
}
LOG.debug("Swapping pipeline suffix. " LOG.debug("Swapping pipeline suffix. "
+ "Just before the swap the number of segments in pipeline is:" + "Just before the swap the number of segments in pipeline is:"
+ versionedList.getStoreSegments().size() + versionedList.getStoreSegments().size()
+ ", and the number of cells in new segment is:" + count); + ", and the new segment is:" + segment);
} }
swapSuffix(suffix, segment, closeSuffix); swapSuffix(suffix, segment, closeSuffix);
readOnlyCopy = new LinkedList<>(pipeline); readOnlyCopy = new LinkedList<>(pipeline);
@ -183,7 +179,9 @@ public class CompactionPipeline {
* *
* @return true iff a segment was successfully flattened * @return true iff a segment was successfully flattened
*/ */
public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) { public boolean flattenOneSegment(long requesterVersion,
CompactingMemStore.IndexType idxType,
MemStoreCompactionStrategy.Action action) {
if(requesterVersion != version) { if(requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match. Requester version: " LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
@ -201,7 +199,7 @@ public class CompactionPipeline {
if ( s.canBeFlattened() ) { if ( s.canBeFlattened() ) {
MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreAccounting); (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
replaceAtIndex(i,newS); replaceAtIndex(i,newS);
if(region != null) { if(region != null) {
// update the global memstore size counter // update the global memstore size counter

View File

@ -0,0 +1,36 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class EagerMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
private static final String name = "EAGER";
public EagerMemStoreCompactionStrategy(Configuration conf, String cfName) {
super(conf, cfName);
}
@Override
public Action getAction(VersionedSegmentsList versionedList) {
return compact(versionedList, name);
}
}

View File

@ -278,19 +278,17 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
} }
String className; String className;
switch (inMemoryCompaction) { switch (inMemoryCompaction) {
case BASIC: case NONE:
case EAGER: className = DefaultMemStore.class.getName();
Class<? extends CompactingMemStore> clz = this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class,
conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); new Object[] { conf, this.comparator });
className = clz.getName(); break;
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this, default:
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
break; CompactingMemStore.class, CompactingMemStore.class);
case NONE: className = clz.getName();
default: this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
className = DefaultMemStore.class.getName(); this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class,
new Object[] { conf, this.comparator });
} }
LOG.info("Memstore class name is " + className); LOG.info("Memstore class name is " + className);
this.offPeakHours = OffPeakHours.getInstance(conf); this.offPeakHours = OffPeakHours.getInstance(conf);

View File

@ -40,6 +40,10 @@ public abstract class ImmutableSegment extends Segment {
// each sub-type of immutable segment knows whether it is flat or not // each sub-type of immutable segment knows whether it is flat or not
protected abstract boolean canBeFlattened(); protected abstract boolean canBeFlattened();
public int getNumUniqueKeys() {
return getCellSet().getNumUniqueKeys();
}
///////////////////// CONSTRUCTORS ///////////////////// ///////////////////// CONSTRUCTORS /////////////////////
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
* Empty C-tor to be used only for CompositeImmutableSegment * Empty C-tor to be used only for CompositeImmutableSegment
@ -64,7 +68,6 @@ public abstract class ImmutableSegment extends Segment {
super(segment); super(segment);
} }
///////////////////// PUBLIC METHODS ///////////////////// ///////////////////// PUBLIC METHODS /////////////////////
public int getNumOfSegments() { public int getNumOfSegments() {
@ -75,4 +78,11 @@ public abstract class ImmutableSegment extends Segment {
List<Segment> res = new ArrayList<>(Arrays.asList(this)); List<Segment> res = new ArrayList<>(Arrays.asList(this));
return res; return res;
} }
@Override
public String toString() {
String res = super.toString();
res += "Num uniques "+getNumUniqueKeys()+"; ";
return res;
}
} }

View File

@ -0,0 +1,114 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* MemStoreCompactionStrategy is the root of a class hierarchy which defines the strategy for
* choosing the next action to apply in an (in-memory) memstore compaction.
* Possible action are:
* - No-op - do nothing
* - Flatten - to change the segment's index from CSLM to a flat representation
* - Merge - to merge the indices of the segments in the pipeline
* - Compact - to merge the indices while removing data redundancies
*
* In addition while applying flat/merge actions it is possible to count the number of unique
* keys in the result segment.
*/
@InterfaceAudience.Private
public abstract class MemStoreCompactionStrategy {
protected static final Log LOG = LogFactory.getLog(MemStoreCompactionStrategy.class);
// The upper bound for the number of segments we store in the pipeline prior to merging.
public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
"hbase.hregion.compacting.pipeline.segments.limit";
public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 4;
/**
* Types of actions to be done on the pipeline upon MemStoreCompaction invocation.
* Note that every value covers the previous ones, i.e. if MERGE is the action it implies
* that the youngest segment is going to be flatten anyway.
*/
public enum Action {
NOOP,
FLATTEN, // flatten a segment in the pipeline
FLATTEN_COUNT_UNIQUE_KEYS, // flatten a segment in the pipeline and count its unique keys
MERGE, // merge all the segments in the pipeline into one
MERGE_COUNT_UNIQUE_KEYS, // merge all pipeline segments into one and count its unique keys
COMPACT // compact the data of all pipeline segments
}
protected final String cfName;
// The limit on the number of the segments in the pipeline
protected final int pipelineThreshold;
public MemStoreCompactionStrategy(Configuration conf, String cfName) {
this.cfName = cfName;
if(conf == null) {
pipelineThreshold = COMPACTING_MEMSTORE_THRESHOLD_DEFAULT;
} else {
pipelineThreshold = // get the limit on the number of the segments in the pipeline
conf.getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
}
}
// get next compaction action to apply on compaction pipeline
public abstract Action getAction(VersionedSegmentsList versionedList);
// update policy stats based on the segment that replaced previous versioned list (in
// compaction pipeline)
public void updateStats(Segment replacement) {}
// resets policy stats
public void resetStats() {}
protected Action simpleMergeOrFlatten(VersionedSegmentsList versionedList, String strategy) {
int numOfSegments = versionedList.getNumOfSegments();
if (numOfSegments > pipelineThreshold) {
// to avoid too many segments, merge now
LOG.debug(strategy+" memory compaction for store " + cfName
+ " merging " + numOfSegments + " segments");
return getMergingAction();
}
// just flatten a segment
LOG.debug(strategy+" memory compaction for store " + cfName
+ " flattening a segment in the pipeline");
return getFlattenAction();
}
protected Action getMergingAction() {
return Action.MERGE;
}
protected Action getFlattenAction() {
return Action.FLATTEN;
}
protected Action compact(VersionedSegmentsList versionedList, String strategyInfo) {
int numOfSegments = versionedList.getNumOfSegments();
LOG.debug(strategyInfo+" memory compaction for store " + cfName
+ " compacting " + numOfSegments + " segments");
return Action.COMPACT;
}
}

View File

@ -18,9 +18,11 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -44,26 +46,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MemStoreCompactor { public class MemStoreCompactor {
// The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation.
// The external setting of the compacting MemStore behaviour
public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
"hbase.hregion.compacting.pipeline.segments.limit";
// remaining with the same ("infinity") but configurable default for now
public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 1;
public static final long DEEP_OVERHEAD = ClassSize public static final long DEEP_OVERHEAD = ClassSize
.align(ClassSize.OBJECT .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
+ 4 * ClassSize.REFERENCE // compactingMemStore, versionedList, isInterrupted, strategy (the reference)
// compactingMemStore, versionedList, action, isInterrupted (the reference)
// "action" is an enum and thus it is a class with static final constants, // "action" is an enum and thus it is a class with static final constants,
// so counting only the size of the reference to it and not the size of the internals // so counting only the size of the reference to it and not the size of the internals
+ 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold + Bytes.SIZEOF_INT // compactionKVMax
+ ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
); );
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
private final int pipelineThreshold; // the limit on the number of the segments in the pipeline
private CompactingMemStore compactingMemStore; private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline // a static version of the segment list from the pipeline
@ -75,29 +67,15 @@ public class MemStoreCompactor {
// the limit to the size of the groups to be later provided to MemStoreSegmentsIterator // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
private final int compactionKVMax; private final int compactionKVMax;
/** private MemStoreCompactionStrategy strategy;
* Types of actions to be done on the pipeline upon MemStoreCompaction invocation.
* Note that every value covers the previous ones, i.e. if MERGE is the action it implies
* that the youngest segment is going to be flatten anyway.
*/
public enum Action {
NOOP,
FLATTEN, // flatten the youngest segment in the pipeline
MERGE, // merge all the segments in the pipeline into one
COMPACT // copy-compact the data of all the segments in the pipeline
}
private Action action = Action.FLATTEN;
public MemStoreCompactor(CompactingMemStore compactingMemStore, public MemStoreCompactor(CompactingMemStore compactingMemStore,
MemoryCompactionPolicy compactionPolicy) { MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
this.compactingMemStore = compactingMemStore; this.compactingMemStore = compactingMemStore;
this.compactionKVMax = compactingMemStore.getConfiguration() this.compactionKVMax = compactingMemStore.getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
initiateAction(compactionPolicy); initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
pipelineThreshold = // get the limit on the number of the segments in the pipeline compactingMemStore.getFamilyName());
compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY,
COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
@ -132,11 +110,9 @@ public class MemStoreCompactor {
isInterrupted.compareAndSet(false, true); isInterrupted.compareAndSet(false, true);
} }
/**----------------------------------------------------------------------
* The interface to check whether user requested the index-compaction public void resetStats() {
*/ strategy.resetStats();
public boolean isIndexCompaction() {
return (action == Action.MERGE);
} }
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
@ -148,40 +124,6 @@ public class MemStoreCompactor {
versionedList = null; versionedList = null;
} }
/**----------------------------------------------------------------------
* Decide what to do with the new and old segments in the compaction pipeline.
* Implements basic in-memory compaction policy.
*/
private Action policy() {
if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
return Action.NOOP; // the compaction also doesn't start when interrupted
}
if (action == Action.COMPACT) { // compact according to the user request
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of"
+ " cells before compaction is " + versionedList.getNumOfCells());
return Action.COMPACT;
}
// compaction shouldn't happen or doesn't worth it
// limit the number of the segments in the pipeline
int numOfSegments = versionedList.getNumOfSegments();
if (numOfSegments > pipelineThreshold) {
LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
+ " is going to be merged to the " + compactingMemStore.getIndexType()
+ ", as there are " + numOfSegments + " segments");
return Action.MERGE; // to avoid too many segments, merge now
}
// if nothing of the above, then just flatten the newly joined segment
LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
+ compactingMemStore.getFamilyName() + " is going to be flattened to the "
+ compactingMemStore.getIndexType());
return Action.FLATTEN;
}
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
* The worker thread performs the compaction asynchronously. * The worker thread performs the compaction asynchronously.
* The solo (per compactor) thread only reads the compaction pipeline. * The solo (per compactor) thread only reads the compaction pipeline.
@ -190,29 +132,37 @@ public class MemStoreCompactor {
private void doCompaction() { private void doCompaction() {
ImmutableSegment result = null; ImmutableSegment result = null;
boolean resultSwapped = false; boolean resultSwapped = false;
Action nextStep = null; if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
try { return; // the compaction also doesn't start when interrupted
nextStep = policy(); }
if (nextStep == Action.NOOP) { MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
boolean merge =
(nextStep == MemStoreCompactionStrategy.Action.MERGE ||
nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
try {
if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
return; return;
} }
if (nextStep == Action.FLATTEN) { if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN ||
// Youngest Segment in the pipeline is with SkipList index, make it flat nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
compactingMemStore.flattenOneSegment(versionedList.getVersion()); // some Segment in the pipeline is with SkipList index, make it flat
compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
return; return;
} }
// Create one segment representing all segments in the compaction pipeline, // Create one segment representing all segments in the compaction pipeline,
// either by compaction or by merge // either by compaction or by merge
if (!isInterrupted.get()) { if (!isInterrupted.get()) {
result = createSubstitution(); result = createSubstitution(nextStep);
} }
// Substitute the pipeline with one segment // Substitute the pipeline with one segment
if (!isInterrupted.get()) { if (!isInterrupted.get()) {
if (resultSwapped = compactingMemStore.swapCompactedSegments( if (resultSwapped = compactingMemStore.swapCompactedSegments(
versionedList, result, (action==Action.MERGE))) { versionedList, result, merge)) {
// update compaction strategy
strategy.updateStats(result);
// update the wal so it can be truncated and not get too long // update the wal so it can be truncated and not get too long
compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
} }
@ -226,10 +176,8 @@ public class MemStoreCompactor {
// we DON'T need to close the result segment (meaning its MSLAB)! // we DON'T need to close the result segment (meaning its MSLAB)!
// Because closing the result segment means closing the chunks of all segments // Because closing the result segment means closing the chunks of all segments
// in the compaction pipeline, which still have ongoing scans. // in the compaction pipeline, which still have ongoing scans.
if (nextStep != Action.MERGE) { if (!merge && (result != null) && !resultSwapped) {
if ((result != null) && (!resultSwapped)) { result.close();
result.close();
}
} }
releaseResources(); releaseResources();
} }
@ -240,7 +188,8 @@ public class MemStoreCompactor {
* Creation of the ImmutableSegment either by merge or copy-compact of the segments of the * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
* pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
*/ */
private ImmutableSegment createSubstitution() throws IOException { private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws
IOException {
ImmutableSegment result = null; ImmutableSegment result = null;
MemStoreSegmentsIterator iterator = null; MemStoreSegmentsIterator iterator = null;
@ -248,46 +197,49 @@ public class MemStoreCompactor {
switch (action) { switch (action) {
case COMPACT: case COMPACT:
iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(), iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); compactingMemStore.getComparator(),
compactionKVMax, compactingMemStore.getStore());
result = SegmentFactory.instance().createImmutableSegmentByCompaction( result = SegmentFactory.instance().createImmutableSegmentByCompaction(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), compactingMemStore.getIndexType()); versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
iterator.close(); iterator.close();
break; break;
case MERGE: case MERGE:
iterator = case MERGE_COUNT_UNIQUE_KEYS:
new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), iterator =
compactingMemStore.getComparator(), new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
compactionKVMax); compactingMemStore.getComparator(), compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge( result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), versionedList.getStoreSegments(), versionedList.getNumOfCells(), versionedList.getStoreSegments(),
compactingMemStore.getIndexType()); compactingMemStore.getIndexType(), action);
iterator.close(); iterator.close();
break; break;
default: throw new RuntimeException("Unknown action " + action); // sanity check default:
throw new RuntimeException("Unknown action " + action); // sanity check
} }
return result; return result;
} }
/**----------------------------------------------------------------------
* Initiate the action according to user config, after its default is Action.MERGE
*/
@VisibleForTesting @VisibleForTesting
void initiateAction(MemoryCompactionPolicy compType) { void initiateCompactionStrategy(MemoryCompactionPolicy compType,
Configuration configuration, String cfName) throws IllegalArgumentIOException {
assert (compType !=MemoryCompactionPolicy.NONE);
switch (compType){ switch (compType){
case NONE: action = Action.NOOP; case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
break; break;
case BASIC: action = Action.MERGE; case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
break; break;
case EAGER: action = Action.COMPACT; case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
break; break;
default: default:
throw new RuntimeException("Unknown memstore type " + compType); // sanity check // sanity check
throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
} }
} }
} }

View File

@ -51,13 +51,13 @@ public final class SegmentFactory {
// for compaction // for compaction
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
CompactingMemStore.IndexType idxType) CompactingMemStore.IndexType idxType, MemStoreCompactionStrategy.Action action)
throws IOException { throws IOException {
MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf); MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return return
createImmutableSegment( createImmutableSegment(
conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType); conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
} }
// create empty immutable segment // create empty immutable segment
@ -82,13 +82,14 @@ public final class SegmentFactory {
// for merge // for merge
public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf, public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType) List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType,
MemStoreCompactionStrategy.Action action)
throws IOException { throws IOException {
MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments); MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
return return
createImmutableSegment( createImmutableSegment(
conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType); conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
} }
@ -96,18 +97,18 @@ public final class SegmentFactory {
// for flattening // for flattening
public ImmutableSegment createImmutableSegmentByFlattening( public ImmutableSegment createImmutableSegmentByFlattening(
CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType,
MemStoreSizing memstoreSizing) { MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
ImmutableSegment res = null; ImmutableSegment res = null;
switch (idxType) { switch (idxType) {
case CHUNK_MAP: case CHUNK_MAP:
res = new CellChunkImmutableSegment(segment, memstoreSizing); res = new CellChunkImmutableSegment(segment, memstoreSizing, action);
break; break;
case CSLM_MAP: case CSLM_MAP:
assert false; // non-flat segment can not be the result of flattening assert false; // non-flat segment can not be the result of flattening
break; break;
case ARRAY_MAP: case ARRAY_MAP:
res = new CellArrayImmutableSegment(segment, memstoreSizing); res = new CellArrayImmutableSegment(segment, memstoreSizing, action);
break; break;
} }
return res; return res;
} }
@ -116,7 +117,7 @@ public final class SegmentFactory {
//****** private methods to instantiate concrete store segments **********// //****** private methods to instantiate concrete store segments **********//
private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells, MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells,
MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) { MemStoreCompactionStrategy.Action action, CompactingMemStore.IndexType idxType) {
ImmutableSegment res = null; ImmutableSegment res = null;
switch (idxType) { switch (idxType) {

View File

@ -18,6 +18,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.util.List; import java.util.List;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -62,4 +64,29 @@ public class VersionedSegmentsList {
public int getNumOfSegments() { public int getNumOfSegments() {
return storeSegments.size(); return storeSegments.size();
} }
// Estimates fraction of unique keys
@VisibleForTesting
double getEstimatedUniquesFrac() {
int segmentCells = 0;
int maxCells = 0;
double est = 0;
for (ImmutableSegment s : storeSegments) {
double segmentUniques = s.getNumUniqueKeys();
if(segmentUniques != CellSet.UNKNOWN_NUM_UNIQUES) {
segmentCells = s.getCellsCount();
if(segmentCells > maxCells) {
maxCells = segmentCells;
est = segmentUniques / segmentCells;
}
}
// else ignore this segment specifically since if the unique number is unknown counting
// cells can be expensive
}
if(maxCells == 0) {
return 1.0;
}
return est;
}
} }

View File

@ -368,7 +368,7 @@ public class TestIOFencing {
Thread.sleep(1000); Thread.sleep(1000);
} }
} }
if (policy == MemoryCompactionPolicy.EAGER) { if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
} else { } else {
assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);

View File

@ -26,13 +26,27 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -74,7 +88,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
compactingSetUp(); compactingSetUp();
this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl.COMPARATOR, this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl
.COMPARATOR,
store, regionServicesForStores, MemoryCompactionPolicy.EAGER); store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
} }
@ -483,7 +498,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType)); String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
byte[] row = Bytes.toBytes("testrow"); byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
@ -498,7 +513,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf3, 1, val), null); memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
// Creating a pipeline // Creating a pipeline
((CompactingMemStore)memstore).disableCompaction(); ((MyCompactingMemStore)memstore).disableCompaction();
((CompactingMemStore)memstore).flushInMemory(); ((CompactingMemStore)memstore).flushInMemory();
// Adding value to "new" memstore // Adding value to "new" memstore
@ -513,7 +528,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
List<KeyValueScanner> scanners = memstore.getScanners(0); List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening // Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data // based on their data
((CompactingMemStore)memstore).enableCompaction(); ((MyCompactingMemStore)memstore).enableCompaction();
// trigger compaction // trigger compaction
((CompactingMemStore)memstore).flushInMemory(); ((CompactingMemStore)memstore).flushInMemory();
@ -564,60 +579,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertTrue(chunkCreator.getPoolSize() > 0); assertTrue(chunkCreator.getPoolSize() > 0);
} }
@Test
public void testFlatteningToCellChunkMap() throws IOException {
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 8;
String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(keys1[0] + 0);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
int totalCellsLen = addRowsByKeys(memstore, keys1);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
.length(kv));
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize());
memstore.clearSnapshot(snapshot.getId());
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Compaction tests // Compaction tests
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -629,7 +590,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration() memstore.getConfiguration()
.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
@ -668,7 +629,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType)); String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
String.valueOf(1));
((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
String[] keys1 = { "A", "A", "B", "C" }; String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" }; String[] keys2 = { "A", "B", "D" };
@ -724,7 +687,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType)); String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
String[] keys1 = { "A", "A", "B", "C" }; String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" }; String[] keys2 = { "A", "B", "D" };
String[] keys3 = { "D", "B", "B" }; String[] keys3 = { "D", "B", "B" };
@ -753,7 +716,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).disableCompaction(); ((MyCompactingMemStore) memstore).disableCompaction();
MemStoreSize size = memstore.getFlushableSize(); MemStoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
@ -769,7 +732,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
+ 3 * oneCellOnCSLMHeapSize; + 3 * oneCellOnCSLMHeapSize;
assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore)memstore).enableCompaction(); ((MyCompactingMemStore)memstore).enableCompaction();
size = memstore.getFlushableSize(); size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
@ -793,6 +756,67 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.clearSnapshot(snapshot.getId()); memstore.clearSnapshot(snapshot.getId());
} }
@Test
public void testMagicCompaction3Buckets() throws IOException {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
memstore.getConfiguration().setDouble(
AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
memstore.getConfiguration().setInt(
AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
String[] keys1 = { "A", "B", "D" };
String[] keys2 = { "A" };
String[] keys3 = { "A", "A", "B", "C" };
String[] keys4 = { "D", "B", "B" };
int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
int oneCellOnCSLMHeapSize = 120;
assertEquals(totalCellsLen1, region.getMemStoreSize());
long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
assertEquals(totalHeapSize, memstore.heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten
assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
assertEquals(1.0,
((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
assertEquals(0, memstore.getSnapshot().getCellsCount());
addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
assertEquals(1.0,
((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
assertEquals(0, memstore.getSnapshot().getCellsCount());
addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
assertEquals((4.0 / 8.0),
((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
assertEquals(0, memstore.getSnapshot().getCellsCount());
addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
assertTrue(4 == numCells || 11 == numCells);
assertEquals(0, memstore.getSnapshot().getCellsCount());
MemStoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
numCells = s.getCellsCount();
assertTrue(4 == numCells || 11 == numCells);
assertEquals(0, regionServicesForStores.getMemStoreSize());
memstore.clearSnapshot(snapshot.getId());
}
private int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { private int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier"); byte[] qf = Bytes.toBytes("testqualifier");
@ -826,4 +850,25 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
} }
} }
static protected class MyCompactingMemStore extends CompactingMemStore {
public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
throws IOException {
super(conf, c, store, regionServices, compactionPolicy);
}
void disableCompaction() {
allowCompaction.set(false);
}
void enableCompaction() {
allowCompaction.set(true);
}
void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
throws IllegalArgumentIOException {
compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
}
}
} }

View File

@ -21,7 +21,11 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -76,7 +80,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
String.valueOf(MemoryCompactionPolicy.EAGER)); String.valueOf(MemoryCompactionPolicy.EAGER));
this.memstore = this.memstore =
new CompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store, new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
regionServicesForStores, MemoryCompactionPolicy.EAGER); regionServicesForStores, MemoryCompactionPolicy.EAGER);
} }
@ -229,7 +233,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).disableCompaction(); ((MyCompactingMemStore) memstore).disableCompaction();
size = memstore.getFlushableSize(); size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
@ -244,7 +248,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3, assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
((CompactingMemStore) memstore).heapSize()); ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore) memstore).enableCompaction(); ((MyCompactingMemStore) memstore).enableCompaction();
size = memstore.getFlushableSize(); size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
@ -293,7 +297,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType)); String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
addRowsByKeys(memstore, keys1); addRowsByKeys(memstore, keys1);
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
@ -311,7 +315,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
} }
assertEquals(12, counter2); assertEquals(12, counter2);
((CompactingMemStore) memstore).disableCompaction(); ((MyCompactingMemStore) memstore).disableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
@ -330,7 +334,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
} }
assertEquals(16, counter4); assertEquals(16, counter4);
((CompactingMemStore) memstore).enableCompaction(); ((MyCompactingMemStore) memstore).enableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
@ -372,7 +376,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType)); String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType); ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
testTimeRange(false); testTimeRange(false);
} }
@ -603,6 +607,61 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertTrue(chunkCount > 0); assertTrue(chunkCount > 0);
} }
@Test
public void testFlatteningToCellChunkMap() throws IOException {
if(!toCellChunkMap) {
return;
}
// set memstore to flat into CellChunkMap
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
int numOfCells = 8;
String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
// make one cell
byte[] row = Bytes.toBytes(keys1[0]);
byte[] val = Bytes.toBytes(keys1[0] + 0);
KeyValue kv =
new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
System.currentTimeMillis(), val);
// test 1 bucket
long totalCellsLen = addRowsByKeys(memstore, keys1);
long oneCellOnCSLMHeapSize =
ClassSize.align(
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
.length(kv));
long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
assertEquals(0, memstore.getSnapshot().getCellsCount());
// One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
// totalCellsLen should remain the same
long oneCellOnCCMHeapSize =
ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ numOfCells * oneCellOnCCMHeapSize;
assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
MemStoreSize size = memstore.getFlushableSize();
MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
region.decrMemStoreSize(size); // simulate flusher
ImmutableSegment s = memstore.getSnapshot();
assertEquals(numOfCells, s.getCellsCount());
assertEquals(0, regionServicesForStores.getMemStoreSize());
memstore.clearSnapshot(snapshot.getId());
}
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
@ -1177,7 +1178,8 @@ public class TestHStore {
} }
@Test @Test
public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException { public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
InterruptedException {
final AtomicBoolean timeToGetHint = new AtomicBoolean(false); final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
final int expectedSize = 2; final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() { testFlushBeforeCompletingScan(new MyListHook() {
@ -1364,7 +1366,8 @@ public class TestHStore {
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
long snapshotId = id++; long snapshotId = id++;
// push older data into snapshot -- phase (1/4) // push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
// insert current data into active -- phase (2/4) // insert current data into active -- phase (2/4)
@ -1464,7 +1467,7 @@ public class TestHStore {
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
// Set the lower threshold to invoke the "MERGE" policy // Set the lower threshold to invoke the "MERGE" policy
conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("thisisavarylargevalue"); byte[] value = Bytes.toBytes("thisisavarylargevalue");
@ -1551,8 +1554,8 @@ public class TestHStore {
private class MyStore extends HStore { private class MyStore extends HStore {
private final MyStoreHook hook; private final MyStoreHook hook;
MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration confParam, MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
MyStoreHook hook, boolean switchToPread) throws IOException { confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam); super(region, family, confParam);
this.hook = hook; this.hook = hook;
} }
@ -1669,7 +1672,8 @@ public class TestHStore {
private static class MyMemStoreCompactor extends MemStoreCompactor { private static class MyMemStoreCompactor extends MemStoreCompactor {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) { public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
compactionPolicy) throws IllegalArgumentIOException {
super(compactingMemStore, compactionPolicy); super(compactingMemStore, compactionPolicy);
} }
@ -1697,7 +1701,8 @@ public class TestHStore {
} }
@Override @Override
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) { protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
throws IllegalArgumentIOException {
return new MyMemStoreCompactor(this, compactionPolicy); return new MyMemStoreCompactor(this, compactionPolicy);
} }

View File

@ -141,7 +141,7 @@ public class TestRecoveredEdits {
// Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if
// we flush at 1MB, that there are at least 3 flushed files that are there because of the // we flush at 1MB, that there are at least 3 flushed files that are there because of the
// replay of edits. // replay of edits.
if(policy == MemoryCompactionPolicy.EAGER) { if(policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
assertTrue("Files count=" + storeFiles.size(), storeFiles.size() >= 1); assertTrue("Files count=" + storeFiles.size(), storeFiles.size() >= 1);
} else { } else {
assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10); assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10);

View File

@ -775,7 +775,7 @@ public class TestWalAndCompactingMemStoreFlush {
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.BASIC)); String.valueOf(MemoryCompactionPolicy.BASIC));
// length of pipeline that requires merge // length of pipeline that requires merge
conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
// Intialize the HRegion // Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);