HBASE-17081 Flush the entire CompactingMemStore content to disk - revert due to failure in TestHRegionWithInMemoryFlush

This commit is contained in:
tedyu 2016-12-28 10:53:07 -08:00
parent da97569eae
commit 79e5efd35c
14 changed files with 174 additions and 697 deletions

View File

@ -159,12 +159,14 @@ public abstract class AbstractMemStore implements MemStore {
public String toString() { public String toString() {
StringBuffer buf = new StringBuffer(); StringBuffer buf = new StringBuffer();
int i = 1; int i = 1;
try {
for (Segment segment : getSegments()) { for (Segment segment : getSegments()) {
buf.append("Segment (" + i + ") " + segment.toString() + "; "); buf.append("Segment (" + i + ") " + segment.toString() + "; ");
i++; i++;
}
} catch (IOException e){
return e.toString();
} }
return buf.toString(); return buf.toString();
} }
@ -230,7 +232,6 @@ public abstract class AbstractMemStore implements MemStore {
* @return Next row or null if none found. If one found, will be a new * @return Next row or null if none found. If one found, will be a new
* KeyValue -- can be destroyed by subsequent calls to this method. * KeyValue -- can be destroyed by subsequent calls to this method.
*/ */
@VisibleForTesting
protected Cell getNextRow(final Cell key, protected Cell getNextRow(final Cell key,
final NavigableSet<Cell> set) { final NavigableSet<Cell> set) {
Cell result = null; Cell result = null;
@ -248,26 +249,6 @@ public abstract class AbstractMemStore implements MemStore {
return result; return result;
} }
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
@VisibleForTesting
Cell getNextRow(final Cell cell) {
Cell lowest = null;
List<Segment> segments = getSegments();
for (Segment segment : segments) {
if (lowest == null) {
//TODO: we may want to move the getNextRow ability to the segment
lowest = getNextRow(cell, segment.getCellSet());
} else {
lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
}
}
return lowest;
}
private Cell maybeCloneWithAllocator(Cell cell) { private Cell maybeCloneWithAllocator(Cell cell) {
return active.maybeCloneWithAllocator(cell); return active.maybeCloneWithAllocator(cell);
} }
@ -326,6 +307,6 @@ public abstract class AbstractMemStore implements MemStore {
/** /**
* @return an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore
*/ */
protected abstract List<Segment> getSegments(); protected abstract List<Segment> getSegments() throws IOException;
} }

View File

@ -72,7 +72,6 @@ public class CompactingMemStore extends AbstractMemStore {
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
@VisibleForTesting @VisibleForTesting
private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true;
public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
@ -161,12 +160,7 @@ public class CompactingMemStore extends AbstractMemStore {
stopCompaction(); stopCompaction();
pushActiveToPipeline(this.active); pushActiveToPipeline(this.active);
snapshotId = EnvironmentEdgeManager.currentTime(); snapshotId = EnvironmentEdgeManager.currentTime();
// in both cases whatever is pushed to snapshot is cleared from the pipeline pushTailToSnapshot();
if (compositeSnapshot) {
pushPipelineToSnapshot();
} else {
pushTailToSnapshot();
}
} }
return new MemStoreSnapshot(snapshotId, this.snapshot); return new MemStoreSnapshot(snapshotId, this.snapshot);
} }
@ -179,13 +173,8 @@ public class CompactingMemStore extends AbstractMemStore {
public MemstoreSize getFlushableSize() { public MemstoreSize getFlushableSize() {
MemstoreSize snapshotSize = getSnapshotSize(); MemstoreSize snapshotSize = getSnapshotSize();
if (snapshotSize.getDataSize() == 0) { if (snapshotSize.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed // if snapshot is empty the tail of the pipeline is flushed
if (compositeSnapshot) { snapshotSize = pipeline.getTailSize();
snapshotSize = pipeline.getPipelineSize();
snapshotSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
} else {
snapshotSize = pipeline.getTailSize();
}
} }
return snapshotSize.getDataSize() > 0 ? snapshotSize return snapshotSize.getDataSize() > 0 ? snapshotSize
: new MemstoreSize(this.active.keySize(), this.active.heapOverhead()); : new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
@ -224,28 +213,16 @@ public class CompactingMemStore extends AbstractMemStore {
} }
} }
// the getSegments() method is used for tests only
@VisibleForTesting
@Override @Override
public List<Segment> getSegments() { public List<Segment> getSegments() {
List<Segment> pipelineList = pipeline.getSegments(); List<Segment> pipelineList = pipeline.getSegments();
List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2); List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
list.add(this.active); list.add(this.active);
list.addAll(pipelineList); list.addAll(pipelineList);
list.addAll(this.snapshot.getAllSegments()); list.add(this.snapshot);
return list; return list;
} }
// the following three methods allow to manipulate the settings of composite snapshot
public void setCompositeSnapshot(boolean useCompositeSnapshot) {
this.compositeSnapshot = useCompositeSnapshot;
}
public boolean isCompositeSnapshot() {
return this.compositeSnapshot;
}
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) { boolean merge) {
return pipeline.swap(versionedList, result, !merge); return pipeline.swap(versionedList, result, !merge);
@ -285,20 +262,18 @@ public class CompactingMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order. * Scanners are ordered from 0 (oldest) to newest in increasing order.
*/ */
public List<KeyValueScanner> getScanners(long readPt) throws IOException { public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<Segment> pipelineList = pipeline.getSegments();
int order = 1; // for active segment long order = pipelineList.size();
order += pipeline.size(); // for all segments in the pipeline // The list of elements in pipeline + the active element + the snapshot segment
order += snapshot.getNumOfSegments(); // for all segments in the snapshot // TODO : This will change when the snapshot is made of more than one element
// TODO: check alternatives to using this order List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
// The list of elements in pipeline + the active element + the snapshot segments list.add(this.active.getScanner(readPt, order + 1));
// The order is the Segment ordinal for (Segment item : pipelineList) {
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(order); list.add(item.getScanner(readPt, order));
list.add(this.active.getScanner(readPt, order)); order--;
order--; }
list.addAll(pipeline.getScanners(readPt,order)); list.add(this.snapshot.getScanner(readPt, order));
order -= pipeline.size(); return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
list.addAll(snapshot.getScanners(readPt,order));
return Collections.<KeyValueScanner>singletonList(new MemStoreScanner(getComparator(), list));
} }
/** /**
@ -405,14 +380,6 @@ public class CompactingMemStore extends AbstractMemStore {
} }
} }
private void pushPipelineToSnapshot() {
List<ImmutableSegment> segments = pipeline.drain();
if (!segments.isEmpty()) {
this.snapshot =
SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments);
}
}
private RegionServicesForStores getRegionServices() { private RegionServicesForStores getRegionServices() {
return regionServices; return regionServices;
} }
@ -460,6 +427,24 @@ public class CompactingMemStore extends AbstractMemStore {
compactor.initiateAction(compactionType); compactor.initiateAction(compactionType);
} }
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
Cell lowest = null;
List<Segment> segments = getSegments();
for (Segment segment : segments) {
if (lowest == null) {
lowest = getNextRow(cell, segment.getCellSet());
} else {
lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
}
}
return lowest;
}
// debug method // debug method
public void debug() { public void debug() {
String msg = "active size=" + this.active.keySize(); String msg = "active size=" + this.active.keySize();

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -78,19 +77,6 @@ public class CompactionPipeline {
} }
} }
public List<ImmutableSegment> drain() {
int drainSize = pipeline.size();
List<ImmutableSegment> result = new ArrayList<ImmutableSegment>(drainSize);
synchronized (pipeline){
version++;
for(int i=0; i<drainSize; i++) {
ImmutableSegment segment = this.pipeline.removeFirst();
result.add(i,segment);
}
return result;
}
}
public VersionedSegmentsList getVersionedList() { public VersionedSegmentsList getVersionedList() {
synchronized (pipeline){ synchronized (pipeline){
LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline); LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline);
@ -207,7 +193,8 @@ public class CompactionPipeline {
public List<Segment> getSegments() { public List<Segment> getSegments() {
synchronized (pipeline){ synchronized (pipeline){
return new LinkedList<Segment>(pipeline); List<Segment> res = new LinkedList<Segment>(pipeline);
return res;
} }
} }
@ -215,18 +202,6 @@ public class CompactionPipeline {
return pipeline.size(); return pipeline.size();
} }
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(this.pipeline.size());
for (Segment segment : this.pipeline) {
scanners.add(segment.getScanner(readPoint, order));
// The order is the Segment ordinal
order--;
assert order>=0; // order should never be negative so this is just a sanity check
}
return scanners;
}
public long getMinSequenceId() { public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE; long minSequenceId = Long.MAX_VALUE;
if (!isEmpty()) { if (!isEmpty()) {
@ -240,11 +215,6 @@ public class CompactionPipeline {
return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
} }
public MemstoreSize getPipelineSize() {
if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
return new MemstoreSize(getSegmentsKeySize(pipeline), getSegmentsHeapOverhead(pipeline));
}
private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment, private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) { boolean closeSegmentsInSuffix) {
version++; version++;

View File

@ -1,352 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
/**
* The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports
* the interface of a single ImmutableSegments.
* The CompositeImmutableSegments is planned to be used only as a snapshot,
* thus only relevant interfaces are supported
*/
@InterfaceAudience.Private
public class CompositeImmutableSegment extends ImmutableSegment {
private final List<ImmutableSegment> segments;
private final CellComparator comparator;
// CompositeImmutableSegment is used for snapshots and snapshot should
// support getTimeRangeTracker() interface.
// Thus we hold a constant TRT build in the construction time from TRT of the given segments.
private final TimeRangeTracker timeRangeTracker;
private long keySize = 0;
// This scanner need to be remembered in order to close it when the snapshot is cleared.
// Initially CollectionBackedScanner didn't raise the scanner counters thus there was no
// need to close it. Now when MemStoreScanner is used instead we need to decrease the
// scanner counters.
private KeyValueScanner flushingScanner = null;
public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
super(comparator);
this.comparator = comparator;
this.segments = segments;
this.timeRangeTracker = new TimeRangeTracker();
for (ImmutableSegment s : segments) {
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
this.keySize += s.keySize();
}
}
@VisibleForTesting
public List<Segment> getAllSegments() {
return new LinkedList<Segment>(segments);
}
public long getNumOfSegments() {
return segments.size();
}
/**
* Builds a special scanner for the MemStoreSnapshot object that is different than the
* general segment scanner.
* @return a special scanner for the MemStoreSnapshot object
*/
public KeyValueScanner getKeyValueScanner() {
KeyValueScanner scanner;
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
for (ImmutableSegment s : segments) {
list.add(s.getScanner(Long.MAX_VALUE));
}
try {
scanner = new MemStoreScanner(getComparator(), list);
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
flushingScanner = scanner;
return scanner;
}
@Override
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(this.segments.size());
for (Segment segment : this.segments) {
scanners.add(segment.getScanner(readPoint, order));
// The order is the Segment ordinal
order--;
// order should never be negative so this is just a sanity check
order = (order<0) ? 0 : order;
}
return scanners;
}
/**
* @return whether the segment has any cells
*/
public boolean isEmpty() {
for (ImmutableSegment s : segments) {
if (!s.isEmpty()) return false;
}
return true;
}
/**
* @return number of cells in segment
*/
public int getCellsCount() {
int result = 0;
for (ImmutableSegment s : segments) {
result += s.getCellsCount();
}
return result;
}
/**
* @return the first cell in the segment that has equal or greater key than the given cell
*/
public Cell getFirstAfter(Cell cell) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Closing a segment before it is being discarded
*/
public void close() {
if (flushingScanner != null) {
flushingScanner.close();
flushingScanner = null;
}
for (ImmutableSegment s : segments) {
s.close();
}
}
/**
* If the segment has a memory allocator the cell is being cloned to this space, and returned;
* otherwise the given cell is returned
* @return either the given cell or its clone
*/
public Cell maybeCloneWithAllocator(Cell cell) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS){
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public long getMinTimestamp(){
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Creates the scanner for the given read point
* @return a scanner for the given read point
*/
public KeyValueScanner getScanner(long readPoint) {
KeyValueScanner resultScanner;
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
for (ImmutableSegment s : segments) {
list.add(s.getScanner(readPoint));
}
try {
resultScanner = new MemStoreScanner(getComparator(), list);
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
return resultScanner;
}
/**
* Creates the scanner for the given read point, and a specific order in a list
* @return a scanner for the given read point
*/
public KeyValueScanner getScanner(long readPoint, long order) {
KeyValueScanner resultScanner;
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(segments.size());
for (ImmutableSegment s : segments) {
list.add(s.getScanner(readPoint,order));
}
try {
resultScanner = new MemStoreScanner(getComparator(), list);
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
return resultScanner;
}
public boolean isTagsPresent() {
for (ImmutableSegment s : segments) {
if (s.isTagsPresent()) return true;
}
return false;
}
public void incScannerCount() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public void decScannerCount() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Setting the CellSet of the segment - used only for flat immutable segment for setting
* immutable CellSet after its creation in immutable segment constructor
* @return this object
*/
protected CompositeImmutableSegment setCellSet(CellSet cellSetOld, CellSet cellSetNew) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* @return Sum of all cell's size.
*/
public long keySize() {
return this.keySize;
}
/**
* @return The heap overhead of this segment.
*/
public long heapOverhead() {
long result = 0;
for (ImmutableSegment s : segments) {
result += s.heapOverhead();
}
return result;
}
/**
* Updates the heap size counter of the segment by the given delta
*/
protected void incSize(long delta, long heapOverhead) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
protected void incHeapOverheadSize(long delta) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public long getMinSequenceId() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public TimeRangeTracker getTimeRangeTracker() {
return this.timeRangeTracker;
}
//*** Methods for SegmentsScanner
public Cell last() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public Iterator<Cell> iterator() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public int compare(Cell left, Cell right) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
public int compareRows(Cell left, Cell right) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* @return a set of all cells in the segment
*/
protected CellSet getCellSet() {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Returns the Cell comparator used by this segment
* @return the Cell comparator used by this segment
*/
protected CellComparator getComparator() {
return comparator;
}
protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemstoreSize memstoreSize) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
protected long heapOverheadChange(Cell cell, boolean succ) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment
* @return a subset of the segment cell set, which starts with the given cell
*/
protected SortedSet<Cell> tailSet(Cell firstCell) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
// Debug methods
/**
* Dumps all cells of the segment into the given log
*/
void dump(Log log) {
for (ImmutableSegment s : segments) {
s.dump(log);
}
}
@Override
public String toString() {
StringBuilder sb =
new StringBuilder("This is CompositeImmutableSegment and those are its segments:: ");
for (ImmutableSegment s : segments) {
sb.append(s.toString());
}
return sb.toString();
}
}

View File

@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -128,20 +127,30 @@ public class DefaultMemStore extends AbstractMemStore {
public List<KeyValueScanner> getScanners(long readPt) throws IOException { public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2); List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2);
list.add(this.active.getScanner(readPt, 1)); list.add(this.active.getScanner(readPt, 1));
list.addAll(this.snapshot.getScanners(readPt, 0)); list.add(this.snapshot.getScanner(readPt, 0));
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list)); return Collections.<KeyValueScanner> singletonList(
new MemStoreScanner(getComparator(), list));
} }
// the getSegments() method is used for tests only
@VisibleForTesting
@Override @Override
protected List<Segment> getSegments() { protected List<Segment> getSegments() throws IOException {
List<Segment> list = new ArrayList<Segment>(2); List<Segment> list = new ArrayList<Segment>(2);
list.add(this.active); list.add(this.active);
list.addAll(this.snapshot.getAllSegments()); list.add(this.snapshot);
return list; return list;
} }
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
return getLowest(
getNextRow(cell, this.active.getCellSet()),
getNextRow(cell, this.snapshot.getCellSet()));
}
@Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -6484,8 +6483,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final Configuration conf, final HTableDescriptor hTableDescriptor, final Configuration conf, final HTableDescriptor hTableDescriptor,
final WAL wal, final boolean initialize) final WAL wal, final boolean initialize)
throws IOException { throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " RootDir = " + rootDir + + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString()); " Table name == " + info.getTable().getNameAsString());
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());

View File

@ -30,10 +30,6 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/** /**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
@ -72,14 +68,6 @@ public class ImmutableSegment extends Segment {
} }
///////////////////// CONSTRUCTORS ///////////////////// ///////////////////// CONSTRUCTORS /////////////////////
/**------------------------------------------------------------------------
* Empty C-tor to be used only for CompositeImmutableSegment
*/
protected ImmutableSegment(CellComparator comparator) {
super(comparator);
this.timeRange = null;
}
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
* Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one. * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
* This C-tor should be used when active MutableSegment is pushed into the compaction * This C-tor should be used when active MutableSegment is pushed into the compaction
@ -154,15 +142,6 @@ public class ImmutableSegment extends Segment {
return this.timeRange.getMin(); return this.timeRange.getMin();
} }
public long getNumOfSegments() {
return 1;
}
public List<Segment> getAllSegments() {
List<Segment> res = new ArrayList<Segment>(Arrays.asList(this));
return res;
}
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
* Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
* based on CellArrayMap. * based on CellArrayMap.
@ -253,7 +232,7 @@ public class ImmutableSegment extends Segment {
Cell curCell; Cell curCell;
int idx = 0; int idx = 0;
// create this segment scanner with maximal possible read point, to go over all Cells // create this segment scanner with maximal possible read point, to go over all Cells
KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE); SegmentScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
try { try {
while ((curCell = segmentScanner.next()) != null) { while ((curCell = segmentScanner.next()) != null) {

View File

@ -56,7 +56,7 @@ public class MemStoreCompactor {
// The upper bound for the number of segments we store in the pipeline prior to merging. // The upper bound for the number of segments we store in the pipeline prior to merging.
// This constant is subject to further experimentation. // This constant is subject to further experimentation.
private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity private static final int THRESHOLD_PIPELINE_SEGMENTS = 1;
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
@ -276,8 +276,6 @@ public class MemStoreCompactor {
case NONE: action = Action.NOOP; case NONE: action = Action.NOOP;
break; break;
case BASIC: action = Action.MERGE; case BASIC: action = Action.MERGE;
// if multiple segments appear in the pipeline flush them to the disk later together
compactingMemStore.setCompositeSnapshot(true);
break; break;
case EAGER: action = Action.COMPACT; case EAGER: action = Action.COMPACT;
break; break;

View File

@ -25,32 +25,19 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MemstoreSize { public class MemstoreSize {
static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
private long dataSize; private long dataSize;
private long heapOverhead; private long heapOverhead;
final private boolean isEmpty;
static final MemstoreSize EMPTY_SIZE = new MemstoreSize(true);
public MemstoreSize() { public MemstoreSize() {
dataSize = 0; dataSize = 0;
heapOverhead = 0; heapOverhead = 0;
isEmpty = false;
}
public MemstoreSize(boolean isEmpty) {
dataSize = 0;
heapOverhead = 0;
this.isEmpty = isEmpty;
}
public boolean isEmpty() {
return isEmpty;
} }
public MemstoreSize(long dataSize, long heapOverhead) { public MemstoreSize(long dataSize, long heapOverhead) {
this.dataSize = dataSize; this.dataSize = dataSize;
this.heapOverhead = heapOverhead; this.heapOverhead = heapOverhead;
this.isEmpty = false;
} }
public void incMemstoreSize(long dataSize, long heapOverhead) { public void incMemstoreSize(long dataSize, long heapOverhead) {
@ -74,13 +61,11 @@ public class MemstoreSize {
} }
public long getDataSize() { public long getDataSize() {
return dataSize;
return isEmpty ? 0 : dataSize;
} }
public long getHeapOverhead() { public long getHeapOverhead() {
return heapOverhead;
return isEmpty ? 0 : heapOverhead;
} }
@Override @Override
@ -89,7 +74,7 @@ public class MemstoreSize {
return false; return false;
} }
MemstoreSize other = (MemstoreSize) obj; MemstoreSize other = (MemstoreSize) obj;
return getDataSize() == other.dataSize && getHeapOverhead() == other.heapOverhead; return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
} }
@Override @Override

View File

@ -18,9 +18,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -66,15 +64,6 @@ public abstract class Segment {
protected final TimeRangeTracker timeRangeTracker; protected final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent; protected volatile boolean tagsPresent;
// Empty constructor to be used when Segment is used as interface,
// and there is no need in true Segments state
protected Segment(CellComparator comparator) {
this.comparator = comparator;
this.dataSize = new AtomicLong(0);
this.heapOverhead = new AtomicLong(0);
this.timeRangeTracker = new TimeRangeTracker();
}
// This constructor is used to create empty Segments. // This constructor is used to create empty Segments.
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
this.cellSet.set(cellSet); this.cellSet.set(cellSet);
@ -102,7 +91,7 @@ public abstract class Segment {
* Creates the scanner for the given read point * Creates the scanner for the given read point
* @return a scanner for the given read point * @return a scanner for the given read point
*/ */
public KeyValueScanner getScanner(long readPoint) { public SegmentScanner getScanner(long readPoint) {
return new SegmentScanner(this, readPoint); return new SegmentScanner(this, readPoint);
} }
@ -110,16 +99,10 @@ public abstract class Segment {
* Creates the scanner for the given read point, and a specific order in a list * Creates the scanner for the given read point, and a specific order in a list
* @return a scanner for the given read point * @return a scanner for the given read point
*/ */
public KeyValueScanner getScanner(long readPoint, long order) { public SegmentScanner getScanner(long readPoint, long order) {
return new SegmentScanner(this, readPoint, order); return new SegmentScanner(this, readPoint, order);
} }
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
scanners.add(getScanner(readPoint, order));
return scanners;
}
/** /**
* @return whether the segment has any cells * @return whether the segment has any cells
*/ */

View File

@ -47,13 +47,6 @@ public final class SegmentFactory {
return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf)); return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
} }
// create composite immutable segment from a list of segments
public CompositeImmutableSegment createCompositeImmutableSegment(
final CellComparator comparator, List<ImmutableSegment> segments) {
return new CompositeImmutableSegment(comparator, segments);
}
// create new flat immutable segment from compacting old immutable segments // create new flat immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf, public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells, final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
@ -109,9 +102,6 @@ public final class SegmentFactory {
private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) { private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List<ImmutableSegment> segments) {
List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>(); List<MemStoreLAB> mslabs = new ArrayList<MemStoreLAB>();
if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
return null;
}
for (ImmutableSegment segment : segments) { for (ImmutableSegment segment : segments) {
mslabs.add(segment.getMemStoreLAB()); mslabs.add(segment.getMemStoreLAB());
} }

View File

@ -137,7 +137,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
this.memstore = new CompactingMemStore(HBaseConfiguration.create(), this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
CellComparator.COMPARATOR, store, regionServicesForStores, CellComparator.COMPARATOR, store, regionServicesForStores,
HColumnDescriptor.MemoryCompaction.EAGER); HColumnDescriptor.MemoryCompaction.EAGER);
this.memstore.add(kv1.clone(), null); this.memstore.add(kv1.clone(), null);
// As compaction is starting in the background the repetition // As compaction is starting in the background the repetition
// of the k1 might be removed BUT the scanners created earlier // of the k1 might be removed BUT the scanners created earlier
@ -178,9 +177,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
// Add more versions to make it a little more interesting. // Add more versions to make it a little more interesting.
Thread.sleep(1); Thread.sleep(1);
addRows(this.memstore); addRows(this.memstore);
((CompactingMemStore)this.memstore).setCompositeSnapshot(true);
Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty, assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty,
new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
@ -281,9 +277,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
this.memstore.upsert(l, 2, null);// readpoint is 2 this.memstore.upsert(l, 2, null);// readpoint is 2
MemstoreSize newSize = this.memstore.size(); MemstoreSize newSize = this.memstore.size();
assertTrue("\n<<< The old size is " + oldSize.getDataSize() + " and the new size is " assert (newSize.getDataSize() > oldSize.getDataSize());
+ newSize.getDataSize() + "\n",
newSize.getDataSize() > oldSize.getDataSize());
//The kv1 should be removed. //The kv1 should be removed.
assert (memstore.getActive().getCellsCount() == 2); assert (memstore.getActive().getCellsCount() == 2);

View File

@ -65,6 +65,8 @@ import org.junit.rules.TestName;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -178,10 +180,6 @@ public class TestDefaultMemStore {
// Now assert can count same number even if a snapshot mid-scan. // Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0; count = 0;
// assertTrue("\n<<< The memstore scanners without snapshot are: \n" + memstorescanners
// + "\n",false);
try { try {
while (s.next(result)) { while (s.next(result)) {
LOG.info(result); LOG.info(result);
@ -209,10 +207,8 @@ public class TestDefaultMemStore {
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0; count = 0;
int snapshotIndex = 5; int snapshotIndex = 5;
try { try {
while (s.next(result)) { while (s.next(result)) {
LOG.info(result); LOG.info(result);
// Assert the stuff is coming out in right order. // Assert the stuff is coming out in right order.
assertTrue(CellUtil.matchingRow(result.get(0), Bytes.toBytes(count))); assertTrue(CellUtil.matchingRow(result.get(0), Bytes.toBytes(count)));
@ -220,7 +216,6 @@ public class TestDefaultMemStore {
assertEquals("count=" + count + ", result=" + result, rowCount, result.size()); assertEquals("count=" + count + ", result=" + result, rowCount, result.size());
count++; count++;
if (count == snapshotIndex) { if (count == snapshotIndex) {
MemStoreSnapshot snapshot = this.memstore.snapshot(); MemStoreSnapshot snapshot = this.memstore.snapshot();
this.memstore.clearSnapshot(snapshot.getId()); this.memstore.clearSnapshot(snapshot.getId());
// Added more rows into kvset. But the scanner wont see these rows. // Added more rows into kvset. But the scanner wont see these rows.
@ -232,8 +227,7 @@ public class TestDefaultMemStore {
} finally { } finally {
s.close(); s.close();
} }
assertEquals("\n<<< The row count is " + rowCount + " and the iteration count is " + count, assertEquals(rowCount, count);
rowCount, count);
} }
/** /**

View File

@ -22,7 +22,13 @@ import java.util.Arrays;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -32,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -50,48 +55,40 @@ public class TestWalAndCompactingMemStoreFlush {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
public static final TableName TABLENAME = public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush",
TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1"); "t1");
public static final byte[][] FAMILIES = public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
{ Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
public static final byte[] FAMILY1 = FAMILIES[0]; public static final byte[] FAMILY1 = FAMILIES[0];
public static final byte[] FAMILY2 = FAMILIES[1]; public static final byte[] FAMILY2 = FAMILIES[1];
public static final byte[] FAMILY3 = FAMILIES[2]; public static final byte[] FAMILY3 = FAMILIES[2];
private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE; int i=0;
assertEquals(memstrsize1.getDataSize(), 0);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
int i = 0;
HTableDescriptor htd = new HTableDescriptor(TABLENAME); HTableDescriptor htd = new HTableDescriptor(TABLENAME);
for (byte[] family : FAMILIES) { for (byte[] family : FAMILIES) {
HColumnDescriptor hcd = new HColumnDescriptor(family); HColumnDescriptor hcd = new HColumnDescriptor(family);
// even column families are going to have compacted memstore // even column families are going to have compacted memstore
if(i%2 == 0) { if(i%2 == 0) {
hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.valueOf( hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.valueOf(
conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
} else { } else {
hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.NONE); hcd.setInMemoryCompaction(HColumnDescriptor.MemoryCompaction.NONE);
} }
htd.addFamily(hcd); htd.addFamily(hcd);
i++; i++;
} }
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
Path path = new Path(DIR, callingMethod); Path path = new Path(DIR, callingMethod);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0); return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
HRegion result = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
return result;
} }
// A helper function to create puts. // A helper function to create puts.
private Put createPut(int familyNum, int putNum) { private Put createPut(int familyNum, int putNum) {
byte[] qf = Bytes.toBytes("q" + familyNum); byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
Put p = new Put(row); Put p = new Put(row);
@ -101,7 +98,7 @@ public class TestWalAndCompactingMemStoreFlush {
// A helper function to create double puts, so something can be compacted later. // A helper function to create double puts, so something can be compacted later.
private Put createDoublePut(int familyNum, int putNum) { private Put createDoublePut(int familyNum, int putNum) {
byte[] qf = Bytes.toBytes("q" + familyNum); byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
Put p = new Put(row); Put p = new Put(row);
@ -125,21 +122,16 @@ public class TestWalAndCompactingMemStoreFlush {
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
r.getFamilyMap(family).get(qf)); r.getFamilyMap(family).get(qf));
assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
Arrays.equals(r.getFamilyMap(family).get(qf), val)); Arrays.equals(r.getFamilyMap(family).get(qf), val));
} }
@Before public void setUp() throws Exception {
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
}
// test selective flush with data-compaction
@Test(timeout = 180000) @Test(timeout = 180000)
public void testSelectiveFlushWithEager() throws IOException { public void testSelectiveFlushWithEager() throws IOException {
// Set up the configuration // Set up the configuration
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
FlushNonSloppyStoresFirstPolicy.class.getName()); FlushNonSloppyStoresFirstPolicy.class.getName());
@ -183,14 +175,17 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
// Get the overall smallest LSN in the region's memstores. // Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstorePhaseI = long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
String s = "\n\n----------------------------------\n" String s = "\n\n----------------------------------\n"
+ "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" + "Upon initial insert and before any flush, size of CF1 is:"
+ region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+ region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI + region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:"
+ ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemstore() + "\n"; + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+ region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:"
+ cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
+ region.getStore(FAMILY3).isSloppyMemstore() + "\n";
// The overall smallest LSN in the region's memstores should be the same as // The overall smallest LSN in the region's memstores should be the same as
// the LSN of the smallest edit in CF1 // the LSN of the smallest edit in CF1
@ -205,12 +200,12 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize=" + totalMemstoreSize + String msg = "totalMemstoreSize="+totalMemstoreSize +
" cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
assertEquals(msg, totalMemstoreSize, assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
// Flush!!!!!!!!!!!!!!!!!!!!!! // Flush!!!!!!!!!!!!!!!!!!!!!!
// We have big compacting memstore CF1 and two small memstores: // We have big compacting memstore CF1 and two small memstores:
@ -230,8 +225,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseII = long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Find the smallest LSNs for edits wrt to each CF. // Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
@ -265,20 +260,16 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " +
"the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:" "the smallest sequence in CF2:"
+ smallestSeqCF3PhaseII + "\n"; + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n";
// How much does the CF1 memstore occupy? Will be used later. // How much does the CF1 memstore occupy? Will be used later.
MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getSizeOfMemStore();
long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
+ ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ;
+ "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
+ ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region
.getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4])
.getFlushedCellsSize() + "; the entire region size is: " + region.getMemstoreSize() + "\n";
;
// Flush!!!!!!!!!!!!!!!!!!!!!! // Flush!!!!!!!!!!!!!!!!!!!!!!
// Flush again, CF1 is flushed to disk // Flush again, CF1 is flushed to disk
@ -291,22 +282,21 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseIV = long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
+ cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize() + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV
+ ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region + "\n";
.getStore(FAMILY3).getFlushedCellsSize() + ", cf4: " + region.getStore(FAMILIES[4])
.getFlushedCellsSize() + "\n";
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
"the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" "the smallest sequence in CF2:"
+ smallestSeqCF3PhaseIV + "\n" + "the entire region size is: " + region.getMemstoreSize() + "\n"; + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV
+ "\n";
// CF1's pipeline component (inserted before first flush) should be flushed to disk // CF1's pipeline component (inserted before first flush) should be flushed to disk
// CF2 should be flushed to disk // CF2 should be flushed to disk
@ -331,21 +321,13 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseV = long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(MemstoreSize.EMPTY_SIZE, cf1MemstoreSizePhaseV); assertEquals(MemstoreSize.EMPTY_SIZE , cf1MemstoreSizePhaseV);
assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV); assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseV);
assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV); assertEquals(MemstoreSize.EMPTY_SIZE, cf3MemstoreSizePhaseV);
s = s + "----AFTER THIRD FLUSH, the entire region size is:" + region.getMemstoreSize()
+ " (empty memstore size is " + MemstoreSize.EMPTY_SIZE
+ "), while the sizes of each memstore are as following \ncf1: " + cf1MemstoreSizePhaseV
+ ", cf2: " + cf2MemstoreSizePhaseV + ", cf3: " + cf3MemstoreSizePhaseV + ", cf4: " + region
.getStore(FAMILIES[4]).getSizeOfMemStore() + "\n" + "The sizes of snapshots are cf1: " + region.getStore(FAMILY1).getFlushedCellsSize()
+ ", cf2: " + region.getStore(FAMILY2).getFlushedCellsSize() + ", cf3: " + region.getStore(FAMILY3).getFlushedCellsSize()
+ ", cf4: " + region.getStore(FAMILIES[4]).getFlushedCellsSize() + "\n";
// What happens when we hit the memstore limit, but we are not able to find // What happens when we hit the memstore limit, but we are not able to find
// any Column Family above the threshold? // any Column Family above the threshold?
// In that case, we should flush all the CFs. // In that case, we should flush all the CFs.
@ -363,22 +345,24 @@ public class TestWalAndCompactingMemStoreFlush {
region.flush(false); region.flush(false);
s = s + "----AFTER FORTH FLUSH, The smallest sequence in region WAL is: " s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
+ smallestSeqInRegionCurrentMemstorePhaseV + smallestSeqInRegionCurrentMemstorePhaseV
+ ". After additional inserts and last flush, the entire region size is:" + region + ". After additional inserts and last flush, the entire region size is:" + region
.getMemstoreSize() + "\n----------------------------------\n"; .getMemstoreSize()
+ "\n----------------------------------\n";
// Since we won't find any CF above the threshold, and hence no specific // Since we won't find any CF above the threshold, and hence no specific
// store to flush, we should flush all the memstores // store to flush, we should flush all the memstores
// Also compacted memstores are flushed to disk. // Also compacted memstores are flushed to disk.
assertEquals(s, 0, region.getMemstoreSize()); assertEquals(0, region.getMemstoreSize());
System.out.println(s); System.out.println(s);
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* Check the same as above but for index-compaction type of compacting memstore */ /* Check the same as above but for index-compaction type of compacting memstore */
@Test(timeout = 180000) public void testSelectiveFlushWithIndexCompaction() throws IOException { @Test(timeout = 180000)
public void testSelectiveFlushWithIndexCompaction() throws IOException {
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* SETUP */ /* SETUP */
@ -395,7 +379,7 @@ public class TestWalAndCompactingMemStoreFlush {
// Initialize the region // Initialize the region
Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* PHASE I - insertions */ /* PHASE I - insertions */
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
@ -426,8 +410,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
// Get the overall smallest LSN in the region's memstores. // Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstorePhaseI = long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* PHASE I - validation */ /* PHASE I - validation */
@ -443,8 +427,8 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
assertEquals(totalMemstoreSizePhaseI, assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* PHASE I - Flush */ /* PHASE I - Flush */
@ -475,8 +459,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseII = long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Find the smallest LSNs for edits wrt to each CF. // Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
long totalMemstoreSizePhaseII = region.getMemstoreSize(); long totalMemstoreSizePhaseII = region.getMemstoreSize();
@ -484,13 +468,13 @@ public class TestWalAndCompactingMemStoreFlush {
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* PHASE II - validation */ /* PHASE II - validation */
// CF1 was flushed to memory, should be flattened and take less space // CF1 was flushed to memory, should be flattened and take less space
assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize()); assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize());
assertTrue(cf1MemstoreSizePhaseII.getHeapOverhead() < cf1MemstoreSizePhaseI.getHeapOverhead()); assertTrue(cf1MemstoreSizePhaseII.getHeapOverhead() < cf1MemstoreSizePhaseI.getHeapOverhead());
// CF2 should become empty // CF2 should become empty
assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII); assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
// verify that CF3 was flushed to memory and was not compacted (this is an approximation check) // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
// if compacted CF# should be at least twice less because its every key was duplicated // if compacted CF# should be at least twice less because its every key was duplicated
assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize()); assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
assertTrue( assertTrue(
cf3MemstoreSizePhaseI.getHeapOverhead() / 2 < cf3MemstoreSizePhaseII.getHeapOverhead()); cf3MemstoreSizePhaseI.getHeapOverhead() / 2 < cf3MemstoreSizePhaseII.getHeapOverhead());
@ -500,8 +484,8 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
// items in CF1/2 // items in CF1/2
assertEquals(totalMemstoreSizePhaseII, assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
cf1MemstoreSizePhaseII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
@ -529,8 +513,8 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
// items in CF1/2 // items in CF1/2
assertEquals(totalMemstoreSizePhaseIII, assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
cf1MemstoreSizePhaseIII.getDataSize() + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
/* PHASE III - Flush */ /* PHASE III - Flush */
@ -546,8 +530,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseIV = long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
@ -577,8 +561,8 @@ public class TestWalAndCompactingMemStoreFlush {
MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getSizeOfMemStore();
MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getSizeOfMemStore();
long smallestSeqInRegionCurrentMemstorePhaseV = long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long totalMemstoreSizePhaseV = region.getMemstoreSize(); long totalMemstoreSizePhaseV = region.getMemstoreSize();
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
@ -633,30 +617,22 @@ public class TestWalAndCompactingMemStoreFlush {
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
// test WAL behavior together with selective flush while data-compaction @Test(timeout = 180000)
@Test(timeout = 180000) public void testDCwithWAL() throws IOException { public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
MemstoreSize checkSize = MemstoreSize.EMPTY_SIZE;
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
// Set up the configuration // Set up the configuration
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushNonSloppyStoresFirstPolicy.class
FlushNonSloppyStoresFirstPolicy.class.getName()); .getName());
conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 *
1024);
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
// set memstore to do data compaction and not to use the speculative scan // set memstore to do data compaction and not to use the speculative scan
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(HColumnDescriptor.MemoryCompaction.EAGER)); String.valueOf(HColumnDescriptor.MemoryCompaction.EAGER));
MemstoreSize memstrsize1 = MemstoreSize.EMPTY_SIZE;
assertEquals(MemstoreSize.EMPTY_SIZE.getDataSize(), 0);
// Intialize the HRegion // Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
MemstoreSize cf2MemstoreSizePhase0 = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf1MemstoreSizePhase0 = region.getStore(FAMILY1).getSizeOfMemStore();
// Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
for (int i = 1; i <= 1200; i++) { for (int i = 1; i <= 1200; i++) {
region.put(createPut(1, i)); region.put(createPut(1, i));
@ -676,7 +652,6 @@ public class TestWalAndCompactingMemStoreFlush {
// Find the sizes of the memstores of each CF. // Find the sizes of the memstores of each CF.
MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore(); MemstoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getSizeOfMemStore();
//boolean oldCF2 = region.getStore(FAMILY2).isSloppyMemstore();
MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore(); MemstoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getSizeOfMemStore();
MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore(); MemstoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getSizeOfMemStore();
@ -687,20 +662,16 @@ public class TestWalAndCompactingMemStoreFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
String msg = "\n<<< totalMemstoreSize=" + totalMemstoreSize + String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD=" + DefaultMemStore.DEEP_OVERHEAD + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
" cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
assertEquals(msg, totalMemstoreSize, assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
+ cf3MemstoreSizePhaseI.getDataSize());
// Flush! // Flush!
CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
MemStore cms2 = ((HStore) region.getStore(FAMILY2)).memstore;
MemstoreSize memstrsize2 = cms2.getSnapshotSize();
MemstoreSize flshsize2 = cms2.getFlushableSize();
CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
cms1.flushInMemory(); cms1.flushInMemory();
cms3.flushInMemory(); cms3.flushInMemory();
@ -713,22 +684,15 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
MemstoreSize newSize = new MemstoreSize();
// CF2 should have been cleared // CF2 should have been cleared
assertEquals( assertEquals(MemstoreSize.EMPTY_SIZE, cf2MemstoreSizePhaseII);
msg + "\n<<< CF2 is compacting " + ((HStore) region.getStore(FAMILY2)).memstore.isSloppy()
+ ", snapshot and flushable size BEFORE flush " + memstrsize2 + "; " + flshsize2
+ ", snapshot and flushable size AFTER flush " + cms2.getSnapshotSize() + "; " + cms2
.getFlushableSize() + "\n<<< cf2 size " + cms2.size() + "; the checked size "
+ cf2MemstoreSizePhaseII + "; memstore empty size " + MemstoreSize.EMPTY_SIZE
+ "; check size " + checkSize + "\n<<< first first first CF2 size "
+ cf2MemstoreSizePhase0 + "; first first first CF1 size " + cf1MemstoreSizePhase0
+ "; new new new size " + newSize + "\n", MemstoreSize.EMPTY_SIZE,
cf2MemstoreSizePhaseII);
String s = "\n\n----------------------------------\n" + "Upon initial insert and flush, LSN of CF1 is:" String s = "\n\n----------------------------------\n"
+ smallestSeqCF1PhaseII + ". LSN of CF2 is:" + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:" + "Upon initial insert and flush, LSN of CF1 is:"
+ smallestSeqCF1PhaseII + ". LSN of CF2 is:"
+ smallestSeqCF2PhaseII + ". LSN of CF3 is:"
+ smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:"
+ smallestSeqInRegionCurrentMemstorePhaseII + "\n"; + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
// Add same entries to compact them later // Add same entries to compact them later
@ -754,8 +718,8 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " +
"the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:" "the smallest sequence in CF2:"
+ smallestSeqCF3PhaseIII + "\n"; + smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n";
// Flush! // Flush!
cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
@ -772,22 +736,20 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
+ ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
"the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" "the smallest sequence in CF2:"
+ smallestSeqCF3PhaseIV + "\n"; + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + "\n";
// now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
assertTrue(s, assertTrue(s, smallestSeqInRegionCurrentMemstorePhaseIV >
smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); smallestSeqInRegionCurrentMemstorePhaseIII);
assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
// test WAL behavior together with selective flush while index-compaction
@Test(timeout = 180000) @Test(timeout = 180000)
public void tstICwithWAL() throws IOException { public void testSelectiveFlushAndWALinIndexCompaction() throws IOException {
// Set up the configuration // Set up the configuration
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);