From 526d2826f5cce4f3b21326657d3c17a651aa6975 Mon Sep 17 00:00:00 2001 From: eshcar Date: Wed, 1 Nov 2017 16:48:40 +0200 Subject: [PATCH] HBASE-16417: In-memory MemStore Policy for Flattening and Compactions --- .../hadoop/hbase/MemoryCompactionPolicy.java | 8 +- .../org/apache/hadoop/hbase/CellUtil.java | 29 +++ .../apache/hadoop/hbase/util/ClassSize.java | 3 +- .../AdaptiveMemStoreCompactionStrategy.java | 112 +++++++++++ .../BasicMemStoreCompactionStrategy.java | 43 +++++ .../CellArrayImmutableSegment.java | 57 +++++- .../CellChunkImmutableSegment.java | 58 +++++- .../hadoop/hbase/regionserver/CellSet.java | 20 +- .../regionserver/CompactingMemStore.java | 37 +--- .../regionserver/CompactionPipeline.java | 12 +- .../EagerMemStoreCompactionStrategy.java | 36 ++++ .../hadoop/hbase/regionserver/HStore.java | 24 ++- .../hbase/regionserver/ImmutableSegment.java | 12 +- .../MemStoreCompactionStrategy.java | 114 ++++++++++++ .../hbase/regionserver/MemStoreCompactor.java | 170 ++++++----------- .../hbase/regionserver/SegmentFactory.java | 31 ++-- .../regionserver/VersionedSegmentsList.java | 27 +++ .../apache/hadoop/hbase/TestIOFencing.java | 2 +- .../regionserver/TestCompactingMemStore.java | 175 +++++++++++------- .../TestCompactingToCellFlatMapMemStore.java | 75 +++++++- .../hadoop/hbase/regionserver/TestHStore.java | 19 +- .../regionserver/TestRecoveredEdits.java | 2 +- .../TestWalAndCompactingMemStoreFlush.java | 2 +- 23 files changed, 790 insertions(+), 278 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java index 654e7ab6062..099ea405459 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java @@ -42,5 +42,11 @@ public enum MemoryCompactionPolicy { * on-disk compaction does after the data is flushed to disk). This policy is most useful for * applications with high data churn or small working sets. */ - EAGER + EAGER, + /** + * Adaptive compaction adapts to the workload. It applies either index compaction or data + * compaction based on the ratio of duplicate cells in the data. + */ + ADAPTIVE + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index c2fb869a2e8..3fdcc735975 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -1612,6 +1612,35 @@ public final class CellUtil { return matchingColumn(left, right); } + public static boolean matchingRowColumnBytes(final Cell left, final Cell right) { + int lrowlength = left.getRowLength(); + int rrowlength = right.getRowLength(); + int lfamlength = left.getFamilyLength(); + int rfamlength = right.getFamilyLength(); + int lqlength = left.getQualifierLength(); + int rqlength = right.getQualifierLength(); + // match length + if ((lrowlength + lfamlength + lqlength) != + (rrowlength + rfamlength + rqlength)) { + return false; + } + + // match row + if (!Bytes.equals(left.getRowArray(), left.getRowOffset(), lrowlength, right.getRowArray(), + right.getRowOffset(), rrowlength)) { + return false; + } + //match family + if (!Bytes.equals(left.getFamilyArray(), left.getFamilyOffset(), lfamlength, + right.getFamilyArray(), right.getFamilyOffset(), rfamlength)) { + return false; + } + //match qualifier + return Bytes.equals(left.getQualifierArray(), left.getQualifierOffset(), + lqlength, right.getQualifierArray(), right.getQualifierOffset(), + rqlength); + } + /** * Compares the cell's qualifier with the given byte[] * @param left the cell for which the qualifier has to be compared diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index d9ea7610738..efcf8d0bec2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -330,9 +330,10 @@ public class ClassSize { TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN); SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE); + NON_SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG); - CELL_SET = align(OBJECT + REFERENCE); + CELL_SET = align(OBJECT + REFERENCE + Bytes.SIZEOF_INT); STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java new file mode 100644 index 00000000000..232ffe35cae --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Adaptive is a heuristic that chooses whether to apply data compaction or not based on the + * level of redundancy in the data. Adaptive triggers redundancy elimination only for those + * stores where positive impact is expected. + * + * Adaptive uses two parameters to determine whether to perform redundancy elimination. + * The first parameter, u, estimates the ratio of unique keys in the memory store based on the + * fraction of unique keys encountered during the previous merge of segment indices. + * The second is the perceived probability (compactionProbability) that the store can benefit from + * redundancy elimination. Initially, compactionProbability=0.5; it then grows exponentially by + * 2% whenever a compaction is successful and decreased by 2% whenever a compaction did not meet + * the expectation. It is reset back to the default value (namely 0.5) upon disk flush. + * + * Adaptive triggers redundancy elimination with probability compactionProbability if the + * fraction of redundant keys 1-u exceeds a parameter threshold compactionThreshold. + */ +@InterfaceAudience.Private +public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy{ + + private static final String name = "ADAPTIVE"; + public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY = + "hbase.hregion.compacting.memstore.adaptive.compaction.threshold"; + private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5; + public static final String ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY = + "hbase.hregion.compacting.memstore.adaptive.compaction.probability"; + private static final double ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT = 0.5; + private static final double ADAPTIVE_PROBABILITY_FACTOR = 1.02; + + private double compactionThreshold; + private double initialCompactionProbability; + private double compactionProbability; + private Random rand = new Random(); + private double numCellsInVersionedList = 0; + private boolean compacted = false; + + public AdaptiveMemStoreCompactionStrategy(Configuration conf, String cfName) { + super(conf, cfName); + compactionThreshold = conf.getDouble(ADAPTIVE_COMPACTION_THRESHOLD_KEY, + ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT); + initialCompactionProbability = conf.getDouble(ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY, + ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT); + resetStats(); + } + + @Override public Action getAction(VersionedSegmentsList versionedList) { + if (versionedList.getEstimatedUniquesFrac() < 1.0 - compactionThreshold) { + double r = rand.nextDouble(); + if(r < compactionProbability) { + numCellsInVersionedList = versionedList.getNumOfCells(); + compacted = true; + return compact(versionedList, name+" (compaction probability="+compactionProbability+")"); + } + } + compacted = false; + return simpleMergeOrFlatten(versionedList, + name+" (compaction probability="+compactionProbability+")"); + } + + @Override + public void updateStats(Segment replacement) { + if(compacted) { + if (replacement.getCellsCount() / numCellsInVersionedList < 1.0 - compactionThreshold) { + // compaction was a good decision - increase probability + compactionProbability *= ADAPTIVE_PROBABILITY_FACTOR; + if(compactionProbability > 1.0) { + compactionProbability = 1.0; + } + } else { + // compaction was NOT a good decision - decrease probability + compactionProbability /= ADAPTIVE_PROBABILITY_FACTOR; + } + } + } + + @Override + public void resetStats() { + compactionProbability = initialCompactionProbability; + } + protected Action getMergingAction() { + return Action.MERGE_COUNT_UNIQUE_KEYS; + } + + protected Action getFlattenAction() { + return Action.FLATTEN; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java new file mode 100644 index 00000000000..d816fc16d32 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java @@ -0,0 +1,43 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Basic strategy chooses between two actions: flattening a segment or merging indices of all + * segments in the pipeline. + * If number of segments in pipeline exceed the limit defined in MemStoreCompactionStrategy then + * apply merge, otherwise flatten some segment. + */ +@InterfaceAudience.Private +public class BasicMemStoreCompactionStrategy extends MemStoreCompactionStrategy{ + + private static final String name = "BASIC"; + + public BasicMemStoreCompactionStrategy(Configuration conf, String cfName) { + super(conf, cfName); + } + + @Override + public Action getAction(VersionedSegmentsList versionedList) { + return simpleMergeOrFlatten(versionedList, name); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java index 8cd5f2a420f..0e80b1d0edd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.ClassSize; @@ -42,7 +43,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { * The given iterator returns the Cells that "survived" the compaction. */ protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { + MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) { super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL incSize(0, DEEP_OVERHEAD_CAM); // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment @@ -54,12 +55,14 @@ public class CellArrayImmutableSegment extends ImmutableSegment { * of CSLMImmutableSegment * The given iterator returns the Cells that "survived" the compaction. */ - protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) { + protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing, + MemStoreCompactionStrategy.Action action) { super(segment); // initiailize the upper class incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM); int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap and update the CellSet of this Segment - reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet()); + reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), + action); // arrange the meta-data size, decrease all meta-data sizes related to SkipList; // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); @@ -81,14 +84,18 @@ public class CellArrayImmutableSegment extends ImmutableSegment { /*------------------------------------------------------------------------*/ // Create CellSet based on CellArrayMap from compacting iterator private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator, - MemStoreCompactor.Action action) { + MemStoreCompactionStrategy.Action action) { + boolean merge = (action == MemStoreCompactionStrategy.Action.MERGE || + action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS); Cell[] cells = new Cell[numOfCells]; // build the Cell Array int i = 0; + int numUniqueKeys=0; + Cell prev = null; while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - if (action == MemStoreCompactor.Action.MERGE) { + if (merge) { // if this is merge we just move the Cell object without copying MSLAB // the sizes still need to be updated in the new segment cells[i] = c; @@ -99,11 +106,27 @@ public class CellArrayImmutableSegment extends ImmutableSegment { // second parameter true, because in compaction/merge the addition of the cell to new segment // is always successful updateMetaInfo(c, true, null); // updates the size per cell + if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) { + //counting number of unique keys + if (prev != null) { + if (!CellUtil.matchingRowColumnBytes(prev, c)) { + numUniqueKeys++; + } + } else { + numUniqueKeys++; + } + } + prev = c; i++; } + if(action == MemStoreCompactionStrategy.Action.COMPACT) { + numUniqueKeys = numOfCells; + } else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) { + numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES; + } // build the immutable CellSet CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false); - this.setCellSet(null, new CellSet(cam)); // update the CellSet of this Segment + this.setCellSet(null, new CellSet(cam, numUniqueKeys)); // update the CellSet of this Segment } /*------------------------------------------------------------------------*/ @@ -111,22 +134,40 @@ public class CellArrayImmutableSegment extends ImmutableSegment { // (without compacting iterator) // We do not consider cells bigger than chunks! private void reinitializeCellSet( - int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) { + int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet, + MemStoreCompactionStrategy.Action action) { Cell[] cells = new Cell[numOfCells]; // build the Cell Array Cell curCell; int idx = 0; + int numUniqueKeys=0; + Cell prev = null; try { while ((curCell = segmentScanner.next()) != null) { cells[idx++] = curCell; + if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { + //counting number of unique keys + if (prev != null) { + if (!CellUtil.matchingRowColumn(prev, curCell)) { + numUniqueKeys++; + } + } else { + numUniqueKeys++; + } + } + prev = curCell; } } catch (IOException ie) { throw new IllegalStateException(ie); } finally { segmentScanner.close(); } + if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { + numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES; + } // build the immutable CellSet CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false); - this.setCellSet(oldCellSet, new CellSet(cam)); // update the CellSet of this Segment + // update the CellSet of this Segment + this.setCellSet(oldCellSet, new CellSet(cam, numUniqueKeys)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java index 4ef0657ca5e..7db00a0c48f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.ByteBufferKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.yetus.audience.InterfaceAudience; @@ -48,7 +49,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { * The given iterator returns the Cells that "survived" the compaction. */ protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) { + MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) { super(null, comparator, memStoreLAB); // initialize the CellSet with NULL incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment @@ -61,12 +62,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment { * The given iterator returns the Cells that "survived" the compaction. */ protected CellChunkImmutableSegment(CSLMImmutableSegment segment, - MemStoreSizing memstoreSizing) { + MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) { super(segment); // initiailize the upper class incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap - reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet()); + reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), + action); // arrange the meta-data size, decrease all meta-data sizes related to SkipList; // add sizes of CellChunkMap entry, decrease also Cell object sizes // (reinitializeCellSet doesn't take the care for the sizes) @@ -90,15 +92,17 @@ public class CellChunkImmutableSegment extends ImmutableSegment { /*------------------------------------------------------------------------*/ // Create CellSet based on CellChunkMap from compacting iterator private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator, - MemStoreCompactor.Action action) { + MemStoreCompactionStrategy.Action action) { // calculate how many chunks we will need for index int chunkSize = ChunkCreator.getInstance().getChunkSize(); int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; - int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk); + int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk); int numOfCellsAfterCompaction = 0; int currentChunkIdx = 0; int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + int numUniqueKeys=0; + Cell prev = null; // all index Chunks are allocated from ChunkCreator Chunk[] chunks = new Chunk[numberOfChunks]; for (int i=0; i < numberOfChunks; i++) { @@ -112,7 +116,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { currentChunkIdx++; // continue to the next index chunk offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; } - if (action == MemStoreCompactor.Action.COMPACT) { + if (action == MemStoreCompactionStrategy.Action.COMPACT) { c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy) } offsetInCurentChunk = // add the Cell reference to the index chunk @@ -122,11 +126,27 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // second parameter true, because in compaction/merge the addition of the cell to new segment // is always successful updateMetaInfo(c, true, null); // updates the size per cell + if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) { + //counting number of unique keys + if (prev != null) { + if (!CellUtil.matchingRowColumnBytes(prev, c)) { + numUniqueKeys++; + } + } else { + numUniqueKeys++; + } + } + prev = c; + } + if(action == MemStoreCompactionStrategy.Action.COMPACT) { + numUniqueKeys = numOfCells; + } else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) { + numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES; } // build the immutable CellSet CellChunkMap ccm = new CellChunkMap(getComparator(), chunks, 0, numOfCellsAfterCompaction, false); - this.setCellSet(null, new CellSet(ccm)); // update the CellSet of this Segment + this.setCellSet(null, new CellSet(ccm, numUniqueKeys)); // update the CellSet of this Segment } /*------------------------------------------------------------------------*/ @@ -135,12 +155,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // This is a service for not-flat immutable segments // Assumption: cells do not exceed chunk size! private void reinitializeCellSet( - int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) { + int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet, + MemStoreCompactionStrategy.Action action) { Cell curCell; // calculate how many chunks we will need for metadata int chunkSize = ChunkCreator.getInstance().getChunkSize(); int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK; - int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk); + int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk); // all index Chunks are allocated from ChunkCreator Chunk[] chunks = new Chunk[numberOfChunks]; for (int i=0; i < numberOfChunks; i++) { @@ -150,6 +171,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment { int currentChunkIdx = 0; int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER; + int numUniqueKeys=0; + Cell prev = null; try { while ((curCell = segmentScanner.next()) != null) { assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue @@ -161,6 +184,20 @@ public class CellChunkImmutableSegment extends ImmutableSegment { offsetInCurentChunk = createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(), offsetInCurentChunk); + if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { + //counting number of unique keys + if (prev != null) { + if (!CellUtil.matchingRowColumn(prev, curCell)) { + numUniqueKeys++; + } + } else { + numUniqueKeys++; + } + } + prev = curCell; + } + if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { + numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES; } } catch (IOException ie) { throw new IllegalStateException(ie); @@ -169,7 +206,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment { } CellChunkMap ccm = new CellChunkMap(getComparator(), chunks, 0, numOfCells, false); - this.setCellSet(oldCellSet, new CellSet(ccm)); // update the CellSet of this Segment + // update the CellSet of this Segment + this.setCellSet(oldCellSet, new CellSet(ccm, numUniqueKeys)); } /*------------------------------------------------------------------------*/ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java index e16d961b910..56717ac7443 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -41,6 +41,8 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public class CellSet implements NavigableSet { + + public static final int UNKNOWN_NUM_UNIQUES = -1; // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap} // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it // is not already present.", this implementation "Adds the specified element to this set EVEN @@ -48,12 +50,22 @@ public class CellSet implements NavigableSet { // Otherwise, has same attributes as ConcurrentSkipListSet private final NavigableMap delegatee; /// + private final int numUniqueKeys; + CellSet(final CellComparator c) { this.delegatee = new ConcurrentSkipListMap<>(c); + this.numUniqueKeys = UNKNOWN_NUM_UNIQUES; } + CellSet(final NavigableMap m, int numUniqueKeys) { + this.delegatee = m; + this.numUniqueKeys = numUniqueKeys; + } + + @VisibleForTesting CellSet(final NavigableMap m) { this.delegatee = m; + this.numUniqueKeys = UNKNOWN_NUM_UNIQUES; } @VisibleForTesting @@ -83,7 +95,7 @@ public class CellSet implements NavigableSet { public NavigableSet headSet(final Cell toElement, boolean inclusive) { - return new CellSet(this.delegatee.headMap(toElement, inclusive)); + return new CellSet(this.delegatee.headMap(toElement, inclusive), UNKNOWN_NUM_UNIQUES); } public Cell higher(Cell e) { @@ -120,7 +132,7 @@ public class CellSet implements NavigableSet { } public NavigableSet tailSet(Cell fromElement, boolean inclusive) { - return new CellSet(this.delegatee.tailMap(fromElement, inclusive)); + return new CellSet(this.delegatee.tailMap(fromElement, inclusive), UNKNOWN_NUM_UNIQUES); } public Comparator comparator() { @@ -187,4 +199,8 @@ public class CellSet implements NavigableSet { public T[] toArray(T[] a) { throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED); } + + public int getNumUniqueKeys() { + return numUniqueKeys; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e6f9451d860..d2502528f7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -66,13 +67,13 @@ public class CompactingMemStore extends AbstractMemStore { // Default fraction of in-memory-flush size w.r.t. flush-to-disk size public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; - private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.25; + private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.02; private static final Log LOG = LogFactory.getLog(CompactingMemStore.class); private HStore store; private RegionServicesForStores regionServices; private CompactionPipeline pipeline; - private MemStoreCompactor compactor; + protected MemStoreCompactor compactor; private long inmemoryFlushSize; // the threshold on active size for in-memory flush private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); @@ -81,7 +82,7 @@ public class CompactingMemStore extends AbstractMemStore { private boolean inWalReplay = false; @VisibleForTesting - private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + protected final AtomicBoolean allowCompaction = new AtomicBoolean(true); private boolean compositeSnapshot = true; /** @@ -119,7 +120,8 @@ public class CompactingMemStore extends AbstractMemStore { } @VisibleForTesting - protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) { + protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) + throws IllegalArgumentIOException { return new MemStoreCompactor(this, compactionPolicy); } @@ -205,6 +207,7 @@ public class CompactingMemStore extends AbstractMemStore { } else { pushTailToSnapshot(); } + compactor.resetStats(); } return new MemStoreSnapshot(snapshotId, this.snapshot); } @@ -298,10 +301,6 @@ public class CompactingMemStore extends AbstractMemStore { this.compositeSnapshot = useCompositeSnapshot; } - public boolean isCompositeSnapshot() { - return this.compositeSnapshot; - } - public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, boolean merge) { // last true stands for updating the region size @@ -313,8 +312,8 @@ public class CompactingMemStore extends AbstractMemStore { * with version taken earlier. This version must be passed as a parameter here. * The flattening happens only if versions match. */ - public void flattenOneSegment(long requesterVersion) { - pipeline.flattenOneSegment(requesterVersion, indexType); + public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) { + pipeline.flattenOneSegment(requesterVersion, indexType, action); } // setter is used only for testability @@ -543,29 +542,11 @@ public class CompactingMemStore extends AbstractMemStore { } } - //---------------------------------------------------------------------- - //methods for tests - //---------------------------------------------------------------------- @VisibleForTesting boolean isMemStoreFlushingInMemory() { return inMemoryFlushInProgress.get(); } - @VisibleForTesting - void disableCompaction() { - allowCompaction.set(false); - } - - @VisibleForTesting - void enableCompaction() { - allowCompaction.set(true); - } - - @VisibleForTesting - void initiateType(MemoryCompactionPolicy compactionType) { - compactor.initiateAction(compactionType); - } - /** * @param cell Find the row that comes after this one. If null, we return the * first. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 75f9914ecb4..42931d067b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -126,14 +126,10 @@ public class CompactionPipeline { } suffix = versionedList.getStoreSegments(); if (LOG.isDebugEnabled()) { - int count = 0; - if(segment != null) { - count = segment.getCellsCount(); - } LOG.debug("Swapping pipeline suffix. " + "Just before the swap the number of segments in pipeline is:" + versionedList.getStoreSegments().size() - + ", and the number of cells in new segment is:" + count); + + ", and the new segment is:" + segment); } swapSuffix(suffix, segment, closeSuffix); readOnlyCopy = new LinkedList<>(pipeline); @@ -183,7 +179,9 @@ public class CompactionPipeline { * * @return true iff a segment was successfully flattened */ - public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) { + public boolean flattenOneSegment(long requesterVersion, + CompactingMemStore.IndexType idxType, + MemStoreCompactionStrategy.Action action) { if(requesterVersion != version) { LOG.warn("Segment flattening failed, because versions do not match. Requester version: " @@ -201,7 +199,7 @@ public class CompactionPipeline { if ( s.canBeFlattened() ) { MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( - (CSLMImmutableSegment)s,idxType,newMemstoreAccounting); + (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action); replaceAtIndex(i,newS); if(region != null) { // update the global memstore size counter diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java new file mode 100644 index 00000000000..90d0756ed58 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java @@ -0,0 +1,36 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class EagerMemStoreCompactionStrategy extends MemStoreCompactionStrategy{ + + private static final String name = "EAGER"; + public EagerMemStoreCompactionStrategy(Configuration conf, String cfName) { + super(conf, cfName); + } + + @Override + public Action getAction(VersionedSegmentsList versionedList) { + return compact(versionedList, name); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4b83b233c4d..db900a124d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -278,19 +278,17 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat } String className; switch (inMemoryCompaction) { - case BASIC: - case EAGER: - Class clz = - conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); - className = clz.getName(); - this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this, - this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); - break; - case NONE: - default: - className = DefaultMemStore.class.getName(); - this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class, - new Object[] { conf, this.comparator }); + case NONE: + className = DefaultMemStore.class.getName(); + this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class, + new Object[] { conf, this.comparator }); + break; + default: + Class clz = conf.getClass(MEMSTORE_CLASS_NAME, + CompactingMemStore.class, CompactingMemStore.class); + className = clz.getName(); + this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this, + this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); } LOG.info("Memstore class name is " + className); this.offPeakHours = OffPeakHours.getInstance(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index c1244ff7fd6..02a05c89d05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -40,6 +40,10 @@ public abstract class ImmutableSegment extends Segment { // each sub-type of immutable segment knows whether it is flat or not protected abstract boolean canBeFlattened(); + public int getNumUniqueKeys() { + return getCellSet().getNumUniqueKeys(); + } + ///////////////////// CONSTRUCTORS ///////////////////// /**------------------------------------------------------------------------ * Empty C-tor to be used only for CompositeImmutableSegment @@ -64,7 +68,6 @@ public abstract class ImmutableSegment extends Segment { super(segment); } - ///////////////////// PUBLIC METHODS ///////////////////// public int getNumOfSegments() { @@ -75,4 +78,11 @@ public abstract class ImmutableSegment extends Segment { List res = new ArrayList<>(Arrays.asList(this)); return res; } + + @Override + public String toString() { + String res = super.toString(); + res += "Num uniques "+getNumUniqueKeys()+"; "; + return res; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java new file mode 100644 index 00000000000..b262328d72b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * MemStoreCompactionStrategy is the root of a class hierarchy which defines the strategy for + * choosing the next action to apply in an (in-memory) memstore compaction. + * Possible action are: + * - No-op - do nothing + * - Flatten - to change the segment's index from CSLM to a flat representation + * - Merge - to merge the indices of the segments in the pipeline + * - Compact - to merge the indices while removing data redundancies + * + * In addition while applying flat/merge actions it is possible to count the number of unique + * keys in the result segment. + */ +@InterfaceAudience.Private +public abstract class MemStoreCompactionStrategy { + + protected static final Log LOG = LogFactory.getLog(MemStoreCompactionStrategy.class); + // The upper bound for the number of segments we store in the pipeline prior to merging. + public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY = + "hbase.hregion.compacting.pipeline.segments.limit"; + public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 4; + + /** + * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. + * Note that every value covers the previous ones, i.e. if MERGE is the action it implies + * that the youngest segment is going to be flatten anyway. + */ + public enum Action { + NOOP, + FLATTEN, // flatten a segment in the pipeline + FLATTEN_COUNT_UNIQUE_KEYS, // flatten a segment in the pipeline and count its unique keys + MERGE, // merge all the segments in the pipeline into one + MERGE_COUNT_UNIQUE_KEYS, // merge all pipeline segments into one and count its unique keys + COMPACT // compact the data of all pipeline segments + } + + protected final String cfName; + // The limit on the number of the segments in the pipeline + protected final int pipelineThreshold; + + + public MemStoreCompactionStrategy(Configuration conf, String cfName) { + this.cfName = cfName; + if(conf == null) { + pipelineThreshold = COMPACTING_MEMSTORE_THRESHOLD_DEFAULT; + } else { + pipelineThreshold = // get the limit on the number of the segments in the pipeline + conf.getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, COMPACTING_MEMSTORE_THRESHOLD_DEFAULT); + } + } + + // get next compaction action to apply on compaction pipeline + public abstract Action getAction(VersionedSegmentsList versionedList); + // update policy stats based on the segment that replaced previous versioned list (in + // compaction pipeline) + public void updateStats(Segment replacement) {} + // resets policy stats + public void resetStats() {} + + protected Action simpleMergeOrFlatten(VersionedSegmentsList versionedList, String strategy) { + int numOfSegments = versionedList.getNumOfSegments(); + if (numOfSegments > pipelineThreshold) { + // to avoid too many segments, merge now + LOG.debug(strategy+" memory compaction for store " + cfName + + " merging " + numOfSegments + " segments"); + return getMergingAction(); + } + + // just flatten a segment + LOG.debug(strategy+" memory compaction for store " + cfName + + " flattening a segment in the pipeline"); + return getFlattenAction(); + } + + protected Action getMergingAction() { + return Action.MERGE; + } + + protected Action getFlattenAction() { + return Action.FLATTEN; + } + + protected Action compact(VersionedSegmentsList versionedList, String strategyInfo) { + int numOfSegments = versionedList.getNumOfSegments(); + LOG.debug(strategyInfo+" memory compaction for store " + cfName + + " compacting " + numOfSegments + " segments"); + return Action.COMPACT; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 410259d710d..af2e1bb466b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -18,9 +18,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.yetus.audience.InterfaceAudience; @@ -44,26 +46,16 @@ import java.util.concurrent.atomic.AtomicBoolean; @InterfaceAudience.Private public class MemStoreCompactor { - // The upper bound for the number of segments we store in the pipeline prior to merging. - // This constant is subject to further experimentation. - // The external setting of the compacting MemStore behaviour - public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY = - "hbase.hregion.compacting.pipeline.segments.limit"; - // remaining with the same ("infinity") but configurable default for now - public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 1; - public static final long DEEP_OVERHEAD = ClassSize - .align(ClassSize.OBJECT - + 4 * ClassSize.REFERENCE - // compactingMemStore, versionedList, action, isInterrupted (the reference) + .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE + // compactingMemStore, versionedList, isInterrupted, strategy (the reference) // "action" is an enum and thus it is a class with static final constants, // so counting only the size of the reference to it and not the size of the internals - + 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold + + Bytes.SIZEOF_INT // compactionKVMax + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) ); private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); - private final int pipelineThreshold; // the limit on the number of the segments in the pipeline private CompactingMemStore compactingMemStore; // a static version of the segment list from the pipeline @@ -75,29 +67,15 @@ public class MemStoreCompactor { // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator private final int compactionKVMax; - /** - * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. - * Note that every value covers the previous ones, i.e. if MERGE is the action it implies - * that the youngest segment is going to be flatten anyway. - */ - public enum Action { - NOOP, - FLATTEN, // flatten the youngest segment in the pipeline - MERGE, // merge all the segments in the pipeline into one - COMPACT // copy-compact the data of all the segments in the pipeline - } - - private Action action = Action.FLATTEN; + private MemStoreCompactionStrategy strategy; public MemStoreCompactor(CompactingMemStore compactingMemStore, - MemoryCompactionPolicy compactionPolicy) { + MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { this.compactingMemStore = compactingMemStore; this.compactionKVMax = compactingMemStore.getConfiguration() .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); - initiateAction(compactionPolicy); - pipelineThreshold = // get the limit on the number of the segments in the pipeline - compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, - COMPACTING_MEMSTORE_THRESHOLD_DEFAULT); + initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(), + compactingMemStore.getFamilyName()); } /**---------------------------------------------------------------------- @@ -132,11 +110,9 @@ public class MemStoreCompactor { isInterrupted.compareAndSet(false, true); } - /**---------------------------------------------------------------------- - * The interface to check whether user requested the index-compaction - */ - public boolean isIndexCompaction() { - return (action == Action.MERGE); + + public void resetStats() { + strategy.resetStats(); } /**---------------------------------------------------------------------- @@ -148,40 +124,6 @@ public class MemStoreCompactor { versionedList = null; } - /**---------------------------------------------------------------------- - * Decide what to do with the new and old segments in the compaction pipeline. - * Implements basic in-memory compaction policy. - */ - private Action policy() { - - if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening - return Action.NOOP; // the compaction also doesn't start when interrupted - } - - if (action == Action.COMPACT) { // compact according to the user request - LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() - + " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of" - + " cells before compaction is " + versionedList.getNumOfCells()); - return Action.COMPACT; - } - - // compaction shouldn't happen or doesn't worth it - // limit the number of the segments in the pipeline - int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > pipelineThreshold) { - LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() - + " is going to be merged to the " + compactingMemStore.getIndexType() - + ", as there are " + numOfSegments + " segments"); - return Action.MERGE; // to avoid too many segments, merge now - } - - // if nothing of the above, then just flatten the newly joined segment - LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store " - + compactingMemStore.getFamilyName() + " is going to be flattened to the " - + compactingMemStore.getIndexType()); - return Action.FLATTEN; - } - /**---------------------------------------------------------------------- * The worker thread performs the compaction asynchronously. * The solo (per compactor) thread only reads the compaction pipeline. @@ -190,29 +132,37 @@ public class MemStoreCompactor { private void doCompaction() { ImmutableSegment result = null; boolean resultSwapped = false; - Action nextStep = null; - try { - nextStep = policy(); + if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening + return; // the compaction also doesn't start when interrupted + } - if (nextStep == Action.NOOP) { + MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList); + boolean merge = + (nextStep == MemStoreCompactionStrategy.Action.MERGE || + nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS); + try { + if (nextStep == MemStoreCompactionStrategy.Action.NOOP) { return; } - if (nextStep == Action.FLATTEN) { - // Youngest Segment in the pipeline is with SkipList index, make it flat - compactingMemStore.flattenOneSegment(versionedList.getVersion()); + if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN || + nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { + // some Segment in the pipeline is with SkipList index, make it flat + compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep); return; } // Create one segment representing all segments in the compaction pipeline, // either by compaction or by merge if (!isInterrupted.get()) { - result = createSubstitution(); + result = createSubstitution(nextStep); } // Substitute the pipeline with one segment if (!isInterrupted.get()) { if (resultSwapped = compactingMemStore.swapCompactedSegments( - versionedList, result, (action==Action.MERGE))) { + versionedList, result, merge)) { + // update compaction strategy + strategy.updateStats(result); // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } @@ -226,10 +176,8 @@ public class MemStoreCompactor { // we DON'T need to close the result segment (meaning its MSLAB)! // Because closing the result segment means closing the chunks of all segments // in the compaction pipeline, which still have ongoing scans. - if (nextStep != Action.MERGE) { - if ((result != null) && (!resultSwapped)) { - result.close(); - } + if (!merge && (result != null) && !resultSwapped) { + result.close(); } releaseResources(); } @@ -240,7 +188,8 @@ public class MemStoreCompactor { * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. */ - private ImmutableSegment createSubstitution() throws IOException { + private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws + IOException { ImmutableSegment result = null; MemStoreSegmentsIterator iterator = null; @@ -248,46 +197,49 @@ public class MemStoreCompactor { switch (action) { case COMPACT: iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(), - compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); + compactingMemStore.getComparator(), + compactionKVMax, compactingMemStore.getStore()); result = SegmentFactory.instance().createImmutableSegmentByCompaction( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - versionedList.getNumOfCells(), compactingMemStore.getIndexType()); - iterator.close(); - break; - case MERGE: - iterator = - new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), - compactingMemStore.getComparator(), - compactionKVMax); + versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action); + iterator.close(); + break; + case MERGE: + case MERGE_COUNT_UNIQUE_KEYS: + iterator = + new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), + compactingMemStore.getComparator(), compactionKVMax); result = SegmentFactory.instance().createImmutableSegmentByMerge( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, versionedList.getNumOfCells(), versionedList.getStoreSegments(), - compactingMemStore.getIndexType()); - iterator.close(); - break; - default: throw new RuntimeException("Unknown action " + action); // sanity check + compactingMemStore.getIndexType(), action); + iterator.close(); + break; + default: + throw new RuntimeException("Unknown action " + action); // sanity check } return result; } - /**---------------------------------------------------------------------- - * Initiate the action according to user config, after its default is Action.MERGE - */ @VisibleForTesting - void initiateAction(MemoryCompactionPolicy compType) { + void initiateCompactionStrategy(MemoryCompactionPolicy compType, + Configuration configuration, String cfName) throws IllegalArgumentIOException { + + assert (compType !=MemoryCompactionPolicy.NONE); switch (compType){ - case NONE: action = Action.NOOP; - break; - case BASIC: action = Action.MERGE; - break; - case EAGER: action = Action.COMPACT; - break; - default: - throw new RuntimeException("Unknown memstore type " + compType); // sanity check + case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName); + break; + case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName); + break; + case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName); + break; + default: + // sanity check + throw new IllegalArgumentIOException("Unknown memory compaction type " + compType); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 43836f48525..db0b319cc75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -51,13 +51,13 @@ public final class SegmentFactory { // for compaction public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, - CompactingMemStore.IndexType idxType) + CompactingMemStore.IndexType idxType, MemStoreCompactionStrategy.Action action) throws IOException { MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf); return createImmutableSegment( - conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType); + conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType); } // create empty immutable segment @@ -82,13 +82,14 @@ public final class SegmentFactory { // for merge public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, - List segments, CompactingMemStore.IndexType idxType) + List segments, CompactingMemStore.IndexType idxType, + MemStoreCompactionStrategy.Action action) throws IOException { MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments); return createImmutableSegment( - conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType); + conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType); } @@ -96,18 +97,18 @@ public final class SegmentFactory { // for flattening public ImmutableSegment createImmutableSegmentByFlattening( CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, - MemStoreSizing memstoreSizing) { + MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) { ImmutableSegment res = null; switch (idxType) { - case CHUNK_MAP: - res = new CellChunkImmutableSegment(segment, memstoreSizing); - break; - case CSLM_MAP: - assert false; // non-flat segment can not be the result of flattening - break; - case ARRAY_MAP: - res = new CellArrayImmutableSegment(segment, memstoreSizing); - break; + case CHUNK_MAP: + res = new CellChunkImmutableSegment(segment, memstoreSizing, action); + break; + case CSLM_MAP: + assert false; // non-flat segment can not be the result of flattening + break; + case ARRAY_MAP: + res = new CellArrayImmutableSegment(segment, memstoreSizing, action); + break; } return res; } @@ -116,7 +117,7 @@ public final class SegmentFactory { //****** private methods to instantiate concrete store segments **********// private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells, - MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) { + MemStoreCompactionStrategy.Action action, CompactingMemStore.IndexType idxType) { ImmutableSegment res = null; switch (idxType) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 4269863ad3d..a697912ee89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; + import java.util.List; import org.apache.yetus.audience.InterfaceAudience; @@ -62,4 +64,29 @@ public class VersionedSegmentsList { public int getNumOfSegments() { return storeSegments.size(); } + + // Estimates fraction of unique keys + @VisibleForTesting + double getEstimatedUniquesFrac() { + int segmentCells = 0; + int maxCells = 0; + double est = 0; + + for (ImmutableSegment s : storeSegments) { + double segmentUniques = s.getNumUniqueKeys(); + if(segmentUniques != CellSet.UNKNOWN_NUM_UNIQUES) { + segmentCells = s.getCellsCount(); + if(segmentCells > maxCells) { + maxCells = segmentCells; + est = segmentUniques / segmentCells; + } + } + // else ignore this segment specifically since if the unique number is unknown counting + // cells can be expensive + } + if(maxCells == 0) { + return 1.0; + } + return est; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index b7592611747..2ef200f5c08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -368,7 +368,7 @@ public class TestIOFencing { Thread.sleep(1000); } } - if (policy == MemoryCompactionPolicy.EAGER) { + if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) { assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); } else { assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index e91520ea8dc..19941b9bcb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -26,13 +26,27 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.MemoryCompactionPolicy; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; @@ -74,7 +88,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Before public void setUp() throws Exception { compactingSetUp(); - this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl.COMPARATOR, + this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl + .COMPARATOR, store, regionServicesForStores, MemoryCompactionPolicy.EAGER); } @@ -483,7 +498,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); + ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); byte[] row = Bytes.toBytes("testrow"); byte[] fam = Bytes.toBytes("testfamily"); @@ -498,7 +513,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf3, 1, val), null); // Creating a pipeline - ((CompactingMemStore)memstore).disableCompaction(); + ((MyCompactingMemStore)memstore).disableCompaction(); ((CompactingMemStore)memstore).flushInMemory(); // Adding value to "new" memstore @@ -513,7 +528,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - ((CompactingMemStore)memstore).enableCompaction(); + ((MyCompactingMemStore)memstore).enableCompaction(); // trigger compaction ((CompactingMemStore)memstore).flushInMemory(); @@ -564,60 +579,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertTrue(chunkCreator.getPoolSize() > 0); } - @Test - public void testFlatteningToCellChunkMap() throws IOException { - - // set memstore to flat into CellChunkMap - MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; - memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); - memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, - String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP)); - ((CompactingMemStore)memstore).setIndexType(); - int numOfCells = 8; - String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8 - - // make one cell - byte[] row = Bytes.toBytes(keys1[0]); - byte[] val = Bytes.toBytes(keys1[0] + 0); - KeyValue kv = - new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), - System.currentTimeMillis(), val); - - // test 1 bucket - int totalCellsLen = addRowsByKeys(memstore, keys1); - long oneCellOnCSLMHeapSize = - ClassSize.align( - ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil - .length(kv)); - - long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; - assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); - assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - - ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten - assertEquals(0, memstore.getSnapshot().getCellsCount()); - // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. - // totalCellsLen should remain the same - long oneCellOnCCMHeapSize = - ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); - totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM - + numOfCells * oneCellOnCCMHeapSize; - - assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); - assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); - - MemStoreSize size = memstore.getFlushableSize(); - MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot - region.decrMemStoreSize(size); // simulate flusher - ImmutableSegment s = memstore.getSnapshot(); - assertEquals(numOfCells, s.getCellsCount()); - assertEquals(0, regionServicesForStores.getMemStoreSize()); - - memstore.clearSnapshot(snapshot.getId()); - } - ////////////////////////////////////////////////////////////////////////////// // Compaction tests ////////////////////////////////////////////////////////////////////////////// @@ -629,7 +590,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; memstore.getConfiguration() .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); + ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 @@ -668,7 +629,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); + memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, + String.valueOf(1)); + ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; @@ -724,7 +687,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); + ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; String[] keys3 = { "D", "B", "B" }; @@ -753,7 +716,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); - ((CompactingMemStore) memstore).disableCompaction(); + ((MyCompactingMemStore) memstore).disableCompaction(); MemStoreSize size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction assertEquals(0, memstore.getSnapshot().getCellsCount()); @@ -769,7 +732,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { + 3 * oneCellOnCSLMHeapSize; assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); - ((CompactingMemStore)memstore).enableCompaction(); + ((MyCompactingMemStore)memstore).enableCompaction(); size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact assertEquals(0, memstore.getSnapshot().getCellsCount()); @@ -793,6 +756,67 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.clearSnapshot(snapshot.getId()); } + @Test + public void testMagicCompaction3Buckets() throws IOException { + + MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; + memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(compactionType)); + memstore.getConfiguration().setDouble( + AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); + memstore.getConfiguration().setInt( + AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); + memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); + ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); + + String[] keys1 = { "A", "B", "D" }; + String[] keys2 = { "A" }; + String[] keys3 = { "A", "A", "B", "C" }; + String[] keys4 = { "D", "B", "B" }; + + int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. + int oneCellOnCSLMHeapSize = 120; + assertEquals(totalCellsLen1, region.getMemStoreSize()); + long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; + assertEquals(totalHeapSize, memstore.heapSize()); + + ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten + assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); + assertEquals(1.0, + ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. + ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction + assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); + assertEquals(1.0, + ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. + ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction + assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); + assertEquals((4.0 / 8.0), + ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) + ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact + int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); + assertTrue(4 == numCells || 11 == numCells); + assertEquals(0, memstore.getSnapshot().getCellsCount()); + + MemStoreSize size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemStoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + numCells = s.getCellsCount(); + assertTrue(4 == numCells || 11 == numCells); + assertEquals(0, regionServicesForStores.getMemStoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + } + private int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); @@ -826,4 +850,25 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } } + static protected class MyCompactingMemStore extends CompactingMemStore { + + public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, + RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) + throws IOException { + super(conf, c, store, regionServices, compactionPolicy); + } + + void disableCompaction() { + allowCompaction.set(false); + } + + void enableCompaction() { + allowCompaction.set(true); + } + void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) + throws IllegalArgumentIOException { + compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); + } + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java index ce0075e0383..a2e4c475125 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -21,7 +21,11 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -76,7 +80,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore String.valueOf(MemoryCompactionPolicy.EAGER)); this.memstore = - new CompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store, + new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store, regionServicesForStores, MemoryCompactionPolicy.EAGER); } @@ -229,7 +233,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); - ((CompactingMemStore) memstore).disableCompaction(); + ((MyCompactingMemStore) memstore).disableCompaction(); size = memstore.getFlushableSize(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; @@ -244,7 +248,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); - ((CompactingMemStore) memstore).enableCompaction(); + ((MyCompactingMemStore) memstore).enableCompaction(); size = memstore.getFlushableSize(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { @@ -293,7 +297,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); + ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); addRowsByKeys(memstore, keys1); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact @@ -311,7 +315,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore } assertEquals(12, counter2); - ((CompactingMemStore) memstore).disableCompaction(); + ((MyCompactingMemStore) memstore).disableCompaction(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening assertEquals(0, memstore.getSnapshot().getCellsCount()); @@ -330,7 +334,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore } assertEquals(16, counter4); - ((CompactingMemStore) memstore).enableCompaction(); + ((MyCompactingMemStore) memstore).enableCompaction(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact @@ -372,7 +376,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); - ((CompactingMemStore)memstore).initiateType(compactionType); + ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); testTimeRange(false); } @@ -603,6 +607,61 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore assertTrue(chunkCount > 0); } + @Test + public void testFlatteningToCellChunkMap() throws IOException { + if(!toCellChunkMap) { + return; + } + // set memstore to flat into CellChunkMap + MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; + memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(compactionType)); + ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); + memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP)); + ((CompactingMemStore)memstore).setIndexType(); + int numOfCells = 8; + String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8 + + // make one cell + byte[] row = Bytes.toBytes(keys1[0]); + byte[] val = Bytes.toBytes(keys1[0] + 0); + KeyValue kv = + new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), + System.currentTimeMillis(), val); + + // test 1 bucket + long totalCellsLen = addRowsByKeys(memstore, keys1); + long oneCellOnCSLMHeapSize = + ClassSize.align( + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil + .length(kv)); + + long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; + assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); + assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); + + ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten + assertEquals(0, memstore.getSnapshot().getCellsCount()); + // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. + // totalCellsLen should remain the same + long oneCellOnCCMHeapSize = + ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); + totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + + numOfCells * oneCellOnCCMHeapSize; + + assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); + assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); + + MemStoreSize size = memstore.getFlushableSize(); + MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot + region.decrMemStoreSize(size); // simulate flusher + ImmutableSegment s = memstore.getSnapshot(); + assertEquals(numOfCells, s.getCellsCount()); + assertEquals(0, regionServicesForStores.getMemStoreSize()); + + memstore.clearSnapshot(snapshot.getId()); + } private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 64ee8dca8ad..4a414d2a08f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.io.compress.Compression; @@ -1177,7 +1178,8 @@ public class TestHStore { } @Test - public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException { + public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, + InterruptedException { final AtomicBoolean timeToGetHint = new AtomicBoolean(false); final int expectedSize = 2; testFlushBeforeCompletingScan(new MyListHook() { @@ -1364,7 +1366,8 @@ public class TestHStore { myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); long snapshotId = id++; // push older data into snapshot -- phase (1/4) - StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); + StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker + .DUMMY); storeFlushCtx.prepare(); // insert current data into active -- phase (2/4) @@ -1464,7 +1467,7 @@ public class TestHStore { conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); // Set the lower threshold to invoke the "MERGE" policy - conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); + conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); byte[] value = Bytes.toBytes("thisisavarylargevalue"); @@ -1551,8 +1554,8 @@ public class TestHStore { private class MyStore extends HStore { private final MyStoreHook hook; - MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration confParam, - MyStoreHook hook, boolean switchToPread) throws IOException { + MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration + confParam, MyStoreHook hook, boolean switchToPread) throws IOException { super(region, family, confParam); this.hook = hook; } @@ -1669,7 +1672,8 @@ public class TestHStore { private static class MyMemStoreCompactor extends MemStoreCompactor { private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); - public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) { + public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy + compactionPolicy) throws IllegalArgumentIOException { super(compactingMemStore, compactionPolicy); } @@ -1697,7 +1701,8 @@ public class TestHStore { } @Override - protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) { + protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) + throws IllegalArgumentIOException { return new MyMemStoreCompactor(this, compactionPolicy); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java index e559b489ed6..2c9a4375edd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -141,7 +141,7 @@ public class TestRecoveredEdits { // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if // we flush at 1MB, that there are at least 3 flushed files that are there because of the // replay of edits. - if(policy == MemoryCompactionPolicy.EAGER) { + if(policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) { assertTrue("Files count=" + storeFiles.size(), storeFiles.size() >= 1); } else { assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index a012d093de3..6a647964db0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -775,7 +775,7 @@ public class TestWalAndCompactingMemStoreFlush { conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(MemoryCompactionPolicy.BASIC)); // length of pipeline that requires merge - conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); + conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); // Intialize the HRegion HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);