HBASE-17081 Flush the entire CompactingMemStore content to disk (Anastasia

Braginsky)
This commit is contained in:
Ramkrishna 2016-12-15 22:02:05 +05:30
parent 401e83cee3
commit a2a7618d26
14 changed files with 700 additions and 174 deletions

View File

@ -159,14 +159,12 @@ public abstract class AbstractMemStore implements MemStore {
public String toString() {
StringBuffer buf = new StringBuffer();
int i = 1;
try {
for (Segment segment : getSegments()) {
buf.append("Segment (" + i + ") " + segment.toString() + "; ");
i++;
}
} catch (IOException e){
return e.toString();
for (Segment segment : getSegments()) {
buf.append("Segment (" + i + ") " + segment.toString() + "; ");
i++;
}
return buf.toString();
}
@ -232,6 +230,7 @@ public abstract class AbstractMemStore implements MemStore {
* @return Next row or null if none found. If one found, will be a new
* KeyValue -- can be destroyed by subsequent calls to this method.
*/
@VisibleForTesting
protected Cell getNextRow(final Cell key,
final NavigableSet<Cell> set) {
Cell result = null;
@ -249,6 +248,26 @@ public abstract class AbstractMemStore implements MemStore {
return result;
}
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
@VisibleForTesting
Cell getNextRow(final Cell cell) {
Cell lowest = null;
List<Segment> segments = getSegments();
for (Segment segment : segments) {
if (lowest == null) {
//TODO: we may want to move the getNextRow ability to the segment
lowest = getNextRow(cell, segment.getCellSet());
} else {
lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
}
}
return lowest;
}
private Cell maybeCloneWithAllocator(Cell cell) {
return active.maybeCloneWithAllocator(cell);
}
@ -307,6 +326,6 @@ public abstract class AbstractMemStore implements MemStore {
/**
* @return an ordered list of segments from most recent to oldest in memstore
*/
protected abstract List<Segment> getSegments() throws IOException;
protected abstract List<Segment> getSegments();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
@ -72,6 +73,7 @@ public class CompactingMemStore extends AbstractMemStore {
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
@VisibleForTesting
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true;
public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
@ -160,7 +162,12 @@ public class CompactingMemStore extends AbstractMemStore {
stopCompaction();
pushActiveToPipeline(this.active);
snapshotId = EnvironmentEdgeManager.currentTime();
pushTailToSnapshot();
// in both cases whatever is pushed to snapshot is cleared from the pipeline
if (compositeSnapshot) {
pushPipelineToSnapshot();
} else {
pushTailToSnapshot();
}
}
return new MemStoreSnapshot(snapshotId, this.snapshot);
}
@ -173,8 +180,13 @@ public class CompactingMemStore extends AbstractMemStore {
public MemstoreSize getFlushableSize() {
MemstoreSize snapshotSize = getSnapshotSize();
if (snapshotSize.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline is flushed
snapshotSize = pipeline.getTailSize();
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) {
snapshotSize = pipeline.getPipelineSize();
snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
} else {
snapshotSize = pipeline.getTailSize();
}
}
return snapshotSize.getDataSize() > 0 ? snapshotSize
: new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
@ -213,16 +225,28 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
// the getSegments() method is used for tests only
@VisibleForTesting
@Override
public List<Segment> getSegments() {
List<Segment> pipelineList = pipeline.getSegments();
List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
list.add(this.active);
list.addAll(pipelineList);
list.add(this.snapshot);
list.addAll(this.snapshot.getAllSegments());
return list;
}
// the following three methods allow to manipulate the settings of composite snapshot
public void setCompositeSnapshot(boolean useCompositeSnapshot) {
this.compositeSnapshot = useCompositeSnapshot;
}
public boolean isCompositeSnapshot() {
return this.compositeSnapshot;
}
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
return pipeline.swap(versionedList, result, !merge);
@ -262,18 +286,21 @@ public class CompactingMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<Segment> pipelineList = pipeline.getSegments();
long order = pipelineList.size();
// The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
list.add(this.active.getScanner(readPt, order + 1));
for (Segment item : pipelineList) {
list.add(item.getScanner(readPt, order));
order--;
}
list.add(this.snapshot.getScanner(readPt, order));
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
int order = 1; // for active segment
order += pipeline.size(); // for all segments in the pipeline
order += snapshot.getNumOfSegments(); // for all segments in the snapshot
// TODO: check alternatives to using this order
// The list of elements in pipeline + the active element + the snapshot segments
// The order is the Segment ordinal
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(order);
list.add(this.active.getScanner(readPt, order));
order--;
list.addAll(pipeline.getScanners(readPt,order));
order -= pipeline.size();
list.addAll(snapshot.getScanners(readPt,order));
return Collections.<KeyValueScanner>singletonList(new MemStoreScanner(getComparator(), list));
}
/**
@ -380,6 +407,14 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
private void pushPipelineToSnapshot() {
List<ImmutableSegment> segments = pipeline.drain();
if (!segments.isEmpty()) {
this.snapshot =
SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments);
}
}
private RegionServicesForStores getRegionServices() {
return regionServices;
}
@ -427,24 +462,6 @@ public class CompactingMemStore extends AbstractMemStore {
compactor.initiateAction(compactionType);
}
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
Cell lowest = null;
List<Segment> segments = getSegments();
for (Segment segment : segments) {
if (lowest == null) {
lowest = getNextRow(cell, segment.getCellSet());
} else {
lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
}
}
return lowest;
}
// debug method
public void debug() {
String msg = "active size=" + this.active.keySize();

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -77,6 +78,19 @@ public class CompactionPipeline {
}
}
public List<ImmutableSegment> drain() {
int drainSize = pipeline.size();
List<ImmutableSegment> result = new ArrayList<ImmutableSegment>(drainSize);
synchronized (pipeline){
version++;
for(int i=0; i<drainSize; i++) {
ImmutableSegment segment = this.pipeline.removeFirst();
result.add(i,segment);
}
return result;
}
}
public VersionedSegmentsList getVersionedList() {
synchronized (pipeline){
LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline);
@ -193,8 +207,7 @@ public class CompactionPipeline {
public List<Segment> getSegments() {
synchronized (pipeline){
List<Segment> res = new LinkedList<Segment>(pipeline);
return res;
return new LinkedList<Segment>(pipeline);
}
}
@ -202,6 +215,18 @@ public class CompactionPipeline {
return pipeline.size();
}
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(this.pipeline.size());
for (Segment segment : this.pipeline) {
scanners.add(segment.getScanner(readPoint, order));
// The order is the Segment ordinal
order--;
assert order>0; // order should never be negative so this is just a sanity check
}
return scanners;
}
public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE;
if (!isEmpty()) {
@ -215,6 +240,11 @@ public class CompactionPipeline {
return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
}
public MemstoreSize getPipelineSize() {
if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline));
}
private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
version++;

View File

@ -0,0 +1,352 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
/**
* The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports
* the interface of a single ImmutableSegments.
* The CompositeImmutableSegments is planned to be used only as a snapshot,
* thus only relevant interfaces are supported
*/
@InterfaceAudience.Private
public class CompositeImmutableSegment extends ImmutableSegment {
private final List<ImmutableSegment> segments;
private final CellComparator comparator;
// CompositeImmutableSegment is used for snapshots and snapshot should
// support getTimeRangeTracker() interface.
// Thus we hold a constant TRT build in the construction time from TRT of the given segments.
private final TimeRangeTracker timeRangeTracker;
private long keySize = 0;
// This scanner need to be remembered in order to close it when the snapshot is cleared.
// Initially CollectionBackedScanner didn't raise the scanner counters thus there was no
// need to close it. Now when MemStoreScanner is used instead we need to decrease the
// scanner counters.
private KeyValueScanner flushingScanner = null;
public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
super(comparator);
this.comparator = comparator;
this.segments = segments;
this.timeRangeTracker = new TimeRangeTracker();
for (ImmutableSegment s : segments) {
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
this.keySize += s.keySize();
}
}
@VisibleForTesting
public List<Segment> getAllSegments() {
return new LinkedList<Segment>(segments);
}
public long getNumOfSegments() {
return segments.size();
}
/**
* Builds a special scanner for the MemStoreSnapshot object that is different than the
* general segment scanner.
* @return a special scanner for the MemStoreSnapshot object
*/
public KeyValueScanner getKeyValueScanner() {
KeyValueScanner scanner;
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
for (ImmutableSegment s : segments) {
list.add(s.getScanner(Long.MAX_VALUE));
}
try {
scanner = new MemStoreScanner(getComparator(), list);
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
flushingScanner = scanner;
return scanner;
}
@Override
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(this.segments.size());
for (Segment segment : this.segments) {
scanners.add(segment.getScanner(readPoint, order));
// The order is the Segment ordinal
order--;
// order should never be negative so this is just a sanity check
order = (order<0) ? 0 : order;
}
return scanners;
}
/**
* @return whether the segment has any cells
*/
public boolean isEmpty() {
for (ImmutableSegment s : segments) {
if (!s.isEmpty()) return false;
}
return true;
}
/**
* @return number of cells in segment
*/
public int getCellsCount() {
int result = 0;
for (ImmutableSegment s : segments) {
result += s.getCellsCount();
}
return result;
}
/**
* @return the first cell in the segment that has equal or greater key than the given cell
*/
public Cell getFirstAfter(Cell cell) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Closing a segment before it is being discarded
*/
public void close() {
if (flushingScanner != null) {
flushingScanner.close();
flushingScanner = null;
}
for (ImmutableSegment s : segments) {
s.close();
}
}
/**
* If the segment has a memory allocator the cell is being cloned to this space, and returned;
* otherwise the given cell is returned
* @return either the given cell or its clone
*/
public Cell maybeCloneWithAllocator(Cell cell) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public long getMinTimestamp(){
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Creates the scanner for the given read point
* @return a scanner for the given read point
*/
public KeyValueScanner getScanner(long readPoint) {
KeyValueScanner resultScanner;
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
for (ImmutableSegment s : segments) {
list.add(s.getScanner(readPoint));
}
try {
resultScanner = new MemStoreScanner(getComparator(), list);
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
return resultScanner;
}
/**
* Creates the scanner for the given read point, and a specific order in a list
* @return a scanner for the given read point
*/
public KeyValueScanner getScanner(long readPoint, long order) {
KeyValueScanner resultScanner;
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
for (ImmutableSegment s : segments) {
list.add(s.getScanner(readPoint,order));
}
try {
resultScanner = new MemStoreScanner(getComparator(), list);
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
return resultScanner;
}
public boolean isTagsPresent() {
for (ImmutableSegment s : segments) {
if (s.isTagsPresent()) return true;
}
return false;
}
public void incScannerCount() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public void decScannerCount() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Setting the CellSet of the segment - used only for flat immutable segment for setting
* immutable CellSet after its creation in immutable segment constructor
* @return this object
*/
protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* @return Sum of all cell's size.
*/
public long keySize() {
return this.keySize;
}
/**
* @return The heap overhead of this segment.
*/
public long heapOverhead() {
long result = 0;
for (ImmutableSegment s : segments) {
result += s.heapOverhead();
}
return result;
}
/**
* Updates the heap size counter of the segment by the given delta
*/
protected void incSize(long delta, long heapOverhead) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
protected void incHeapOverheadSize(long delta) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public long getMinSequenceId() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public TimeRangeTracker getTimeRangeTracker() {
return this.timeRangeTracker;
}
//*** Methods for SegmentsScanner
public Cell last() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public Iterator<Cell> iterator() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public int compare(Cell left, Cell right) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public int compareRows(Cell left, Cell right) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* @return a set of all cells in the segment
*/
protected CellSet getCellSet() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Returns the Cell comparator used by this segment
* @return the Cell comparator used by this segment
*/
protected CellComparator getComparator() {
return comparator;
}
protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemstoreSize memstoreSize) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
protected long heapOverheadChange(Cell cell, boolean succ) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment
* @return a subset of the segment cell set, which starts with the given cell
*/
protected SortedSet<Cell> tailSet(Cell firstCell) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
// Debug methods
/**
* Dumps all cells of the segment into the given log
*/
void dump(Log log) {
for (ImmutableSegment s : segments) {
s.dump(log);
}
}
@Override
public String toString() {
StringBuilder sb =
new StringBuilder("This is CompositeImmutableSegment and those are its segments:: ");
for (ImmutableSegment s : segments) {
sb.append(s.toString());
}
return sb.toString();
}
}

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -127,30 +128,20 @@ public class DefaultMemStore extends AbstractMemStore {
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2);
list.add(this.active.getScanner(readPt, 1));
list.add(this.snapshot.getScanner(readPt, 0));
return Collections.<KeyValueScanner> singletonList(
new MemStoreScanner(getComparator(), list));
list.addAll(this.snapshot.getScanners(readPt, 0));
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
}
// the getSegments() method is used for tests only
@VisibleForTesting
@Override
protected List<Segment> getSegments() throws IOException {
protected List<Segment> getSegments() {
List<Segment> list = new ArrayList<Segment>(2);
list.add(this.active);
list.add(this.snapshot);
list.addAll(this.snapshot.getAllSegments());
return list;
}
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
return getLowest(
getNextRow(cell, this.active.getCellSet()),
getNextRow(cell, this.snapshot.getCellSet()));
}
@Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@ -6429,8 +6430,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final Configuration conf, final HTableDescriptor hTableDescriptor,
final WAL wal, final boolean initialize)
throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor
+ " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
FileSystem fs = FileSystem.get(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());

View File

@ -30,6 +30,10 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
@ -68,6 +72,14 @@ public class ImmutableSegment extends Segment {
}
///////////////////// CONSTRUCTORS /////////////////////
/**------------------------------------------------------------------------
* Empty C-tor to be used only for CompositeImmutableSegment
*/
protected ImmutableSegment(CellComparator comparator) {
super(comparator);
this.timeRange = null;
}
/**------------------------------------------------------------------------
* Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
* This C-tor should be used when active MutableSegment is pushed into the compaction
@ -142,6 +154,15 @@ public class ImmutableSegment extends Segment {
return this.timeRange.getMin();
}
public long getNumOfSegments() {
return 1;
}
public List<Segment> getAllSegments() {
List<Segment> res = new ArrayList<Segment>(Arrays.asList(this));
return res;
}
/**------------------------------------------------------------------------
* Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
* based on CellArrayMap.
@ -232,7 +253,7 @@ public class ImmutableSegment extends Segment {
Cell curCell;
int idx = 0;
// create this segment scanner with maximal possible read point, to go over all Cells
SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
try {
while ((curCell = segmentScanner.next()) != null) {

View File

@ -56,7 +56,7 @@ public class MemStoreCompactor {
// The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation.
private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
@ -276,6 +276,8 @@ public class MemStoreCompactor {
case NONE: action = Action.NOOP;
break;
case BASIC: action = Action.MERGE;
// if multiple segments appear in the pipeline flush them to the disk later together
compactingMemStore.setCompositeSnapshot(true);
break;
case EAGER: action = Action.COMPACT;
break;

View File

@ -25,19 +25,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class MemstoreSize {
static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
private long dataSize;
private long heapOverhead;
final private boolean isEmpty;
static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true);
public MemstoreSize() {
dataSize = 0;
heapOverhead = 0;
isEmpty = false;
}
public MemstoreSize(boolean isEmpty) {
dataSize = 0;
heapOverhead = 0;
this.isEmpty = isEmpty;
}
public boolean isEmpty() {
return isEmpty;
}
public MemstoreSize(long dataSize, long heapOverhead) {
this.dataSize = dataSize;
this.heapOverhead = heapOverhead;
this.isEmpty = false;
}
public void incMemstoreSize(long dataSize, long heapOverhead) {
@ -61,11 +74,13 @@ public class MemstoreSize {
}
public long getDataSize() {
return dataSize;
return isEmpty ? 0 : dataSize;
}
public long getHeapOverhead() {
return heapOverhead;
return isEmpty ? 0 : heapOverhead;
}
@Override
@ -74,7 +89,7 @@ public class MemstoreSize {
return false;
}
MemstoreSize other = (MemstoreSize) obj;
return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead;
}
@Override

View File

@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -64,6 +66,15 @@ public abstract class Segment {
protected final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent;
// Empty constructor to be used when Segment is used as interface,
// and there is no need in true Segments state
protected Segment(CellComparator comparator) {
this.comparator = comparator;
this.dataSize = new AtomicLong(0);
this.heapOverhead = new AtomicLong(0);
this.timeRangeTracker = new TimeRangeTracker();
}
// This constructor is used to create empty Segments.
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
this.cellSet.set(cellSet);
@ -91,7 +102,7 @@ public abstract class Segment {
* Creates the scanner for the given read point
* @return a scanner for the given read point
*/
public SegmentScanner getScanner(long readPoint) {
public KeyValueScanner getScanner(long readPoint) {
return new SegmentScanner(this, readPoint);
}
@ -99,10 +110,16 @@ public abstract class Segment {
* Creates the scanner for the given read point, and a specific order in a list
* @return a scanner for the given read point
*/
public SegmentScanner getScanner(long readPoint, long order) {
public KeyValueScanner getScanner(long readPoint, long order) {
return new SegmentScanner(this, readPoint, order);
}
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
scanners.add(getScanner(readPoint, order));
return scanners;
}
/**
* @return whether the segment has any cells
*/

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
@ -47,6 +48,13 @@ public final class SegmentFactory {
return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
}
// create composite immutable segment from a list of segments
public CompositeImmutableSegment createCompositeImmutableSegment(
final CellComparator comparator, List<ImmutableSegment> segments) {
return new CompositeImmutableSegment(comparator, segments);
}
// create new flat immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
@ -102,6 +110,9 @@ public final class SegmentFactory {
private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
return null;
}
for (ImmutableSegment segment : segments) {
mslabs.add(segment.getMemStoreLAB());
}

View File

@ -137,6 +137,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
CellComparator.COMPARATOR, store, regionServicesForStores,
HColumnDescriptor.MemoryCompaction.EAGER);
this.memstore.add(kv1.clone(), null);
// As compaction is starting in the background the repetition
// of the k1 might be removed BUT the scanners created earlier
@ -177,6 +178,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
// Add more versions to make it a little more interesting.
Thread.sleep(1);
addRows(this.memstore);
((CompactingMemStore)this.memstore).setCompositeSnapshot(true);
Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty,
new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
@ -277,7 +281,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
this.memstore.upsert(l, 2, null);// readpoint is 2
MemstoreSize newSize = this.memstore.size();
assert (newSize.getDataSize() > oldSize.getDataSize());
assertTrue("\n<<< The old size is " + oldSize.getDataSize() + " and the new size is "
+ newSize.getDataSize() + "\n",
newSize.getDataSize() > oldSize.getDataSize());
//The kv1 should be removed.
assert (memstore.getActive().getCellsCount() == 2);

View File

@ -65,8 +65,6 @@ import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -180,6 +178,10 @@ public class TestDefaultMemStore {
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
// assertTrue("\n<<< The memstore scanners without snapshot are: \n" + memstorescanners
// + "\n",false);
try {
while (s.next(result)) {
LOG.info(result);
@ -207,8 +209,10 @@ public class TestDefaultMemStore {
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
int snapshotIndex = 5;
try {
while (s.next(result)) {
LOG.info(result);
// Assert the stuff is coming out in right order.
assertTrue(CellUtil.matchingRow(result.get(0), Bytes.toBytes(count)));
@ -216,6 +220,7 @@ public class TestDefaultMemStore {
assertEquals("count=" + count + ", result=" + result, rowCount, result.size());
count++;
if (count == snapshotIndex) {
MemStoreSnapshot snapshot = this.memstore.snapshot();
this.memstore.clearSnapshot(snapshot.getId());
// Added more rows into kvset. But the scanner wont see these rows.
@ -227,7 +232,8 @@ public class TestDefaultMemStore {
} finally {
s.close();
}
assertEquals(rowCount, count);
assertEquals("\n<<< The row count is " + rowCount + " and the iteration count is " + count,
rowCount, count);
}
/**

View File

@ -22,13 +22,7 @@ import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@ -38,6 +32,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -55,40 +50,48 @@ public class TestWalAndCompactingMemStoreFlush {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush",
"t1");
public static final TableName TABLENAME =
TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1");
public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
public static final byte[][] FAMILIES =
{ Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
public static final byte[] FAMILY1 = FAMILIES[0];
public static final byte[] FAMILY2 = FAMILIES[1];
public static final byte[] FAMILY3 = FAMILIES[2];
private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
int i=0;
MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE;
assertEquals(memstrsize1.getDataSize(), 0);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
int i = 0;
HTableDescriptor htd = new HTableDescriptor(TABLENAME);
for (byte[] family : FAMILIES) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
// even column families are going to have compacted memstore
if(i%2 == 0) {
hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.valueOf(
conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
} else {
hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.NONE);
}
htd.addFamily(hcd);
i++;
}
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
Path path = new Path(DIR, callingMethod);
return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
HRegion result = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
return result;
}
// A helper function to create puts.
private Put createPut(int familyNum, int putNum) {
byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
Put p = new Put(row);
@ -98,7 +101,7 @@ public class TestWalAndCompactingMemStoreFlush {
// A helper function to create double puts, so something can be compacted later.
private Put createDoublePut(int familyNum, int putNum) {
byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
Put p = new Put(row);
@ -122,16 +125,21 @@ public class TestWalAndCompactingMemStoreFlush {
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
r.getFamilyMap(family).get(qf));
r.getFamilyMap(family).get(qf));
assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
Arrays.equals(r.getFamilyMap(family).get(qf), val));
Arrays.equals(r.getFamilyMap(family).get(qf), val));
}
@Before public void setUp() throws Exception {
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
}
// test selective flush with data-compaction
@Test(timeout = 180000)
public void testSelectiveFlushWithEager() throws IOException {
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName());
@ -175,17 +183,14 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
// Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseI =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
String s = "\n\n----------------------------------\n"
+ "Upon initial insert and before any flush, size of CF1 is:"
+ cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+ region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:"
+ cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+ region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:"
+ cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
+ region.getStore(FAMILY3).isSloppyMemstore() + "\n";
+ "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+ region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+ region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI
+ ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemstore() + "\n";
// The overall smallest LSN in the region's memstores should be the same as
// the LSN of the smallest edit in CF1
@ -200,12 +205,12 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize +
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+ cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
String msg = "totalMemstoreSize=" + totalMemstoreSize +
" cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
assertEquals(msg, totalMemstoreSize,
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
// Flush!!!!!!!!!!!!!!!!!!!!!!
// We have big compacting memstore CF1 and two small memstores:
@ -225,8 +230,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseII =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
@ -260,16 +265,20 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " +
"the smallest sequence in CF2:"
+ smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n";
"the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:"
+ smallestSeqCF3PhaseII + "\n";
// How much does the CF1 memstore occupy? Will be used later.
MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
+ ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ;
+ ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n"
+ "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
+ ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region
.getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4])
.getFlushedCellsSize() + "; the entire region size is: " + region.getMemstoreSize() + "\n";
;
// Flush!!!!!!!!!!!!!!!!!!!!!!
// Flush again, CF1 is flushed to disk
@ -282,21 +291,22 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseIV =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
+ cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV
+ "\n";
+ cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
+ ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region
.getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4])
.getFlushedCellsSize() + "\n";
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
"the smallest sequence in CF2:"
+ smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV
+ "\n";
"the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
+ smallestSeqCF3PhaseIV + "\n" + "the entire region size is: " + region.getMemstoreSize() + "\n";
// CF1's pipeline component (inserted before first flush) should be flushed to disk
// CF2 should be flushed to disk
@ -321,13 +331,21 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseV =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(MemstoreSize.EMPTY_SIZE , cf1MemstoreSizePhaseV);
assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSizePhaseV);
assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV);
assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV);
s = s + "----AFTER THIRD FLUSH, the entire region size is:" + region.getMemstoreSize()
+ " (empty memstore size is " + MemstoreSize.EMPTY_SIZE
+ "), while the sizes of each memstore are as following \ncf1: " + cf1MemstoreSizePhaseV
+ ", cf2: " + cf2MemstoreSizePhaseV + ", cf3: " + cf3MemstoreSizePhaseV + ", cf4: " + region
.getStore(FAMILIES[4]).getSizeOfMemStore() + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
+ ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region.getStore(FAMILY3).getFlushedCellsSize()
+ ", cf4: " + region.getStore(FAMILIES[4]).getFlushedCellsSize() + "\n";
// What happens when we hit the memstore limit, but we are not able to find
// any Column Family above the threshold?
// In that case, we should flush all the CFs.
@ -345,24 +363,22 @@ public class TestWalAndCompactingMemStoreFlush {
region.flush(false);
s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
s = s + "----AFTER FORTH FLUSH, The smallest sequence in region WAL is: "
+ smallestSeqInRegionCurrentMemstorePhaseV
+ ". After additional inserts and last flush, the entire region size is:" + region
.getMemstoreSize()
+ "\n----------------------------------\n";
.getMemstoreSize() + "\n----------------------------------\n";
// Since we won't find any CF above the threshold, and hence no specific
// store to flush, we should flush all the memstores
// Also compacted memstores are flushed to disk.
assertEquals(0, region.getMemstoreSize());
assertEquals(s, 0, region.getMemstoreSize());
System.out.println(s);
HBaseTestingUtility.closeRegionAndWAL(region);
}
/*------------------------------------------------------------------------------*/
/* Check the same as above but for index-compaction type of compacting memstore */
@Test(timeout = 180000)
public void testSelectiveFlushWithIndexCompaction() throws IOException {
@Test(timeout = 180000) public void testSelectiveFlushWithIndexCompaction() throws IOException {
/*------------------------------------------------------------------------------*/
/* SETUP */
@ -379,7 +395,7 @@ public class TestWalAndCompactingMemStoreFlush {
// Initialize the region
Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
/*------------------------------------------------------------------------------*/
/* PHASE I - insertions */
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
@ -410,8 +426,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
// Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseI =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
/*------------------------------------------------------------------------------*/
/* PHASE I - validation */
@ -427,8 +443,8 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
+ cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
assertEquals(totalMemstoreSizePhaseI,
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
/*------------------------------------------------------------------------------*/
/* PHASE I - Flush */
@ -459,8 +475,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseII =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
long totalMemstoreSizePhaseII = region.getMemstoreSize();
@ -468,13 +484,13 @@ public class TestWalAndCompactingMemStoreFlush {
/*------------------------------------------------------------------------------*/
/* PHASE II - validation */
// CF1 was flushed to memory, should be flattened and take less space
assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize());
assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize());
assertTrue(cf1MemstoreSizePhaseII.getHeapOverhead() < cf1MemstoreSizePhaseI.getHeapOverhead());
// CF2 should become empty
assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
// verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
// if compacted CF# should be at least twice less because its every key was duplicated
assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize());
assertTrue(
cf3MemstoreSizePhaseI.getHeapOverhead() / 2 < cf3MemstoreSizePhaseII.getHeapOverhead());
@ -484,8 +500,8 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
// items in CF1/2
assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
+ cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
assertEquals(totalMemstoreSizePhaseII,
cf1MemstoreSizePhaseII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
/*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/
@ -513,8 +529,8 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
// items in CF1/2
assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
+ cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
assertEquals(totalMemstoreSizePhaseIII,
cf1MemstoreSizePhaseIII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
/*------------------------------------------------------------------------------*/
/* PHASE III - Flush */
@ -530,8 +546,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseIV =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
/*------------------------------------------------------------------------------*/
@ -561,8 +577,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseV =
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long totalMemstoreSizePhaseV = region.getMemstoreSize();
/*------------------------------------------------------------------------------*/
@ -617,22 +633,30 @@ public class TestWalAndCompactingMemStoreFlush {
HBaseTestingUtility.closeRegionAndWAL(region);
}
@Test(timeout = 180000)
public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
// test WAL behavior together with selective flush while data-compaction
@Test(timeout = 180000) public void testDCwithWAL() throws IOException {
MemstoreSize checkSize = MemstoreSize.EMPTY_SIZE;
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class
.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 *
1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do data compaction and not to use the speculative scan
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(HColumnDescriptor.MemoryCompaction.EAGER));
MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE;
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
// Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
MemstoreSize cf2MemstoreSizePhase0 = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf1MemstoreSizePhase0 = region.getStore(FAMILY1).getSizeOfMemStore();
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i));
@ -652,6 +676,7 @@ public class TestWalAndCompactingMemStoreFlush {
// Find the sizes of the memstores of each CF.
MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore();
//boolean oldCF2 = region.getStore(FAMILY2).isSloppyMemstore();
MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
@ -662,16 +687,20 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
+ cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
String msg = "\n<<< totalMemstoreSize=" + totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD=" + DefaultMemStore.DEEP_OVERHEAD +
" cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
assertEquals(msg, totalMemstoreSize,
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize()
+ cf3MemstoreSizePhaseI.getDataSize());
// Flush!
CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
MemStore cms2 = ((HStore) region.getStore(FAMILY2)).memstore;
MemstoreSize memstrsize2 = cms2.getSnapshotSize();
MemstoreSize flshsize2 = cms2.getFlushableSize();
CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
cms1.flushInMemory();
cms3.flushInMemory();
@ -684,15 +713,22 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
MemstoreSize newSize = new MemstoreSize();
// CF2 should have been cleared
assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
assertEquals(
msg + "\n<<< CF2 is compacting " + ((HStore) region.getStore(FAMILY2)).memstore.isSloppy()
+ ", snapshot and flushable size BEFORE flush " + memstrsize2 + "; " + flshsize2
+ ", snapshot and flushable size AFTER flush " + cms2.getSnapshotSize() + "; " + cms2
.getFlushableSize() + "\n<<< cf2 size " + cms2.size() + "; the checked size "
+ cf2MemstoreSizePhaseII + "; memstore empty size " + MemstoreSize.EMPTY_SIZE
+ "; check size " + checkSize + "\n<<< first first first CF2 size "
+ cf2MemstoreSizePhase0 + "; first first first CF1 size " + cf1MemstoreSizePhase0
+ "; new new new size " + newSize + "\n", MemstoreSize.EMPTY_SIZE,
cf2MemstoreSizePhaseII);
String s = "\n\n----------------------------------\n"
+ "Upon initial insert and flush, LSN of CF1 is:"
+ smallestSeqCF1PhaseII + ". LSN of CF2 is:"
+ smallestSeqCF2PhaseII + ". LSN of CF3 is:"
+ smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:"
String s = "\n\n----------------------------------\n" + "Upon initial insert and flush, LSN of CF1 is:"
+ smallestSeqCF1PhaseII + ". LSN of CF2 is:" + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:"
+ smallestSeqInRegionCurrentMemstorePhaseII + "\n";
// Add same entries to compact them later
@ -718,8 +754,8 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " +
"the smallest sequence in CF2:"
+ smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n";
"the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:"
+ smallestSeqCF3PhaseIII + "\n";
// Flush!
cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
@ -736,20 +772,22 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
"the smallest sequence in CF2:"
+ smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + "\n";
"the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
+ smallestSeqCF3PhaseIV + "\n";
// now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
assertTrue(s, smallestSeqInRegionCurrentMemstorePhaseIV >
smallestSeqInRegionCurrentMemstorePhaseIII);
assertTrue(s,
smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
HBaseTestingUtility.closeRegionAndWAL(region);
}
// test WAL behavior together with selective flush while index-compaction
@Test(timeout = 180000)
public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
public void tstICwithWAL() throws IOException {
// Set up the configuration
Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);