HBASE-14919 Refactoring for in-memory flush and compaction

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
eshcar 2016-02-08 23:35:02 +02:00 committed by stack
parent a975408b7c
commit 25dfc112dd
23 changed files with 2218 additions and 1103 deletions

View File

@ -0,0 +1,497 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* An abstract class, which implements the behaviour shared by all concrete memstore instances.
*/
@InterfaceAudience.Private
public abstract class AbstractMemStore implements MemStore {
private static final long NO_SNAPSHOT_ID = -1;
private final Configuration conf;
private final CellComparator comparator;
// active segment absorbs write operations
private volatile MutableSegment active;
// Snapshot of memstore. Made for flusher.
private volatile ImmutableSegment snapshot;
protected volatile long snapshotId;
// Used to track when to flush
private volatile long timeOfOldestEdit;
public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
(4 * ClassSize.REFERENCE) +
(2 * Bytes.SIZEOF_LONG));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP));
protected AbstractMemStore(final Configuration conf, final CellComparator c) {
this.conf = conf;
this.comparator = c;
resetCellSet();
this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0);
this.snapshotId = NO_SNAPSHOT_ID;
}
protected void resetCellSet() {
// Reset heap to not include any keys
this.active = SegmentFactory.instance().createMutableSegment(
conf, comparator, DEEP_OVERHEAD);
this.timeOfOldestEdit = Long.MAX_VALUE;
}
/*
* Calculate how the MemStore size has changed. Includes overhead of the
* backing Map.
* @param cell
* @param notPresent True if the cell was NOT present in the set.
* @return change in size
*/
static long heapSizeChange(final Cell cell, final boolean notPresent) {
return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
+ CellUtil.estimatedHeapSizeOf(cell)) : 0;
}
/**
* Updates the wal with the lowest sequence id (oldest entry) that is still in memory
* @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
* only if it is greater than the previous sequence id
*/
public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent);
/**
* Write an update
* @param cell the cell to be added
* @return approximate size of the passed cell & newly added cell which maybe different than the
* passed-in cell
*/
@Override
public long add(Cell cell) {
Cell toAdd = maybeCloneWithAllocator(cell);
return internalAdd(toAdd);
}
/**
* Update or insert the specified Cells.
* <p>
* For each Cell, insert into MemStore. This will atomically upsert the
* value for that row/family/qualifier. If a Cell did already exist,
* it will then be removed.
* <p>
* Currently the memstoreTS is kept at 0 so as each insert happens, it will
* be immediately visible. May want to change this so it is atomic across
* all Cells.
* <p>
* This is called under row lock, so Get operations will still see updates
* atomically. Scans will only see each Cell update as atomic.
*
* @param cells the cells to be updated
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @return change in memstore size
*/
@Override
public long upsert(Iterable<Cell> cells, long readpoint) {
long size = 0;
for (Cell cell : cells) {
size += upsert(cell, readpoint);
}
return size;
}
/**
* @return Oldest timestamp of all the Cells in the MemStore
*/
@Override
public long timeOfOldestEdit() {
return timeOfOldestEdit;
}
/**
* Write a delete
* @param deleteCell the cell to be deleted
* @return approximate size of the passed key and value.
*/
@Override
public long delete(Cell deleteCell) {
Cell toAdd = maybeCloneWithAllocator(deleteCell);
long s = internalAdd(toAdd);
return s;
}
/**
* An override on snapshot so the no arg version of the method implies zero seq num,
* like for cases without wal
*/
public MemStoreSnapshot snapshot() {
return snapshot(0);
}
/**
* The passed snapshot was successfully persisted; it can be let go.
* @param id Id of the snapshot to clean out.
* @see MemStore#snapshot(long)
*/
@Override
public void clearSnapshot(long id) throws UnexpectedStateException {
if (this.snapshotId != id) {
throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
+ id);
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
Segment oldSnapshot = this.snapshot;
if (!this.snapshot.isEmpty()) {
this.snapshot = SegmentFactory.instance().createImmutableSegment(
getComparator(), 0);
}
this.snapshotId = NO_SNAPSHOT_ID;
oldSnapshot.close();
}
/**
* Get the entire heap usage for this MemStore not including keys in the
* snapshot.
*/
@Override
public long heapSize() {
return getActive().getSize();
}
/**
* On flush, how much memory we will clear from the active cell set.
*
* @return size of data that is going to be flushed from active set
*/
@Override
public long getFlushableSize() {
long snapshotSize = getSnapshot().getSize();
return snapshotSize > 0 ? snapshotSize : keySize();
}
/**
* @return a list containing a single memstore scanner.
*/
@Override
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(this, readPt));
}
@Override
public long getSnapshotSize() {
return getSnapshot().getSize();
}
@Override
public String toString() {
StringBuffer buf = new StringBuffer();
int i = 1;
try {
for (Segment segment : getListOfSegments()) {
buf.append("Segment (" + i + ") " + segment.toString() + "; ");
i++;
}
} catch (IOException e){
return e.toString();
}
return buf.toString();
}
protected void rollbackInSnapshot(Cell cell) {
// If the key is in the snapshot, delete it. We should not update
// this.size, because that tracks the size of only the memstore and
// not the snapshot. The flush of this snapshot to disk has not
// yet started because Store.flush() waits for all rwcc transactions to
// commit before starting the flush to disk.
snapshot.rollback(cell);
}
protected void rollbackInActive(Cell cell) {
// If the key is in the memstore, delete it. Update this.size.
long sz = active.rollback(cell);
if (sz != 0) {
setOldestEditTimeToNow();
}
}
protected Configuration getConfiguration() {
return conf;
}
protected void dump(Log log) {
active.dump(log);
snapshot.dump(log);
}
/**
* Inserts the specified Cell into MemStore and deletes any existing
* versions of the same row/family/qualifier as the specified Cell.
* <p>
* First, the specified Cell is inserted into the Memstore.
* <p>
* If there are any existing Cell in this MemStore with the same row,
* family, and qualifier, they are removed.
* <p>
* Callers must hold the read lock.
*
* @param cell the cell to be updated
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @return change in size of MemStore
*/
private long upsert(Cell cell, long readpoint) {
// Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
// hitting OOME - see TestMemStore.testUpsertMSLAB for a
// test that triggers the pathological case if we don't avoid MSLAB
// here.
long addedSize = internalAdd(cell);
// Get the Cells for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts
Cell firstCell = KeyValueUtil.createFirstOnRow(
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
SortedSet<Cell> ss = active.tailSet(firstCell);
Iterator<Cell> it = ss.iterator();
// versions visible to oldest scanner
int versionsVisible = 0;
while (it.hasNext()) {
Cell cur = it.next();
if (cell == cur) {
// ignore the one just put in
continue;
}
// check that this is the row and column we are interested in, otherwise bail
if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
// only remove Puts that concurrent scanners cannot possibly see
if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
cur.getSequenceId() <= readpoint) {
if (versionsVisible >= 1) {
// if we get here we have seen at least one version visible to the oldest scanner,
// which means we can prove that no scanner will see this version
// false means there was a change, so give us the size.
long delta = heapSizeChange(cur, true);
addedSize -= delta;
active.incSize(-delta);
it.remove();
setOldestEditTimeToNow();
} else {
versionsVisible++;
}
}
} else {
// past the row or column, done
break;
}
}
return addedSize;
}
/*
* @param a
* @param b
* @return Return lowest of a or b or null if both a and b are null
*/
protected Cell getLowest(final Cell a, final Cell b) {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return comparator.compareRows(a, b) <= 0? a: b;
}
/*
* @param key Find row that follows this one. If null, return first.
* @param set Set to look in for a row beyond <code>row</code>.
* @return Next row or null if none found. If one found, will be a new
* KeyValue -- can be destroyed by subsequent calls to this method.
*/
protected Cell getNextRow(final Cell key,
final NavigableSet<Cell> set) {
Cell result = null;
SortedSet<Cell> tail = key == null? set: set.tailSet(key);
// Iterate until we fall into the next row; i.e. move off current row
for (Cell cell: tail) {
if (comparator.compareRows(cell, key) <= 0) {
continue;
}
// Note: Not suppressing deletes or expired cells. Needs to be handled
// by higher up functions.
result = cell;
break;
}
return result;
}
/**
* Given the specs of a column, update it, first by inserting a new record,
* then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
* will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
* store will ensure that the insert/delete each are atomic. A scanner/reader will either
* get the new value, or the old value and all readers will eventually only see the new
* value after the old was removed.
*/
@VisibleForTesting
@Override
public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier,
long newValue, long now) {
Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
// Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
Cell snc = snapshot.getFirstAfter(firstCell);
if(snc != null) {
// is there a matching Cell in the snapshot?
if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
if (snc.getTimestamp() == now) {
now += 1;
}
}
}
// logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
// But the timestamp should also be max(now, mostRecentTsInMemstore)
// so we cant add the new Cell w/o knowing what's there already, but we also
// want to take this chance to delete some cells. So two loops (sad)
SortedSet<Cell> ss = getActive().tailSet(firstCell);
for (Cell cell : ss) {
// if this isnt the row we are interested in, then bail:
if (!CellUtil.matchingColumn(cell, family, qualifier)
|| !CellUtil.matchingRow(cell, firstCell)) {
break; // rows dont match, bail.
}
// if the qualifier matches and it's a put, just RM it out of the active.
if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
now = cell.getTimestamp();
}
}
// create or update (upsert) a new Cell with
// 'now' and a 0 memstoreTS == immediately visible
List<Cell> cells = new ArrayList<Cell>(1);
cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
return upsert(cells, 1L);
}
private Cell maybeCloneWithAllocator(Cell cell) {
return active.maybeCloneWithAllocator(cell);
}
/**
* Internal version of add() that doesn't clone Cells with the
* allocator, and doesn't take the lock.
*
* Callers should ensure they already have the read lock taken
*/
private long internalAdd(final Cell toAdd) {
long s = active.add(toAdd);
setOldestEditTimeToNow();
checkActiveSize();
return s;
}
private void setOldestEditTimeToNow() {
if (timeOfOldestEdit == Long.MAX_VALUE) {
timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
}
}
protected long keySize() {
return heapSize() - DEEP_OVERHEAD;
}
protected CellComparator getComparator() {
return comparator;
}
protected MutableSegment getActive() {
return active;
}
protected ImmutableSegment getSnapshot() {
return snapshot;
}
protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) {
this.snapshot = snapshot;
return this;
}
protected void setSnapshotSize(long snapshotSize) {
getSnapshot().setSize(snapshotSize);
}
/**
* Check whether anything need to be done based on the current active set size
*/
protected abstract void checkActiveSize();
/**
* Returns a list of Store segment scanners, one per each store segment
* @param readPt the version number required to initialize the scanners
* @return a list of Store segment scanners, one per each store segment
*/
protected abstract List<SegmentScanner> getListOfScanners(long readPt) throws IOException;
/**
* Returns an ordered list of segments from most recent to oldest in memstore
* @return an ordered list of segments from most recent to oldest in memstore
*/
protected abstract List<Segment> getListOfSegments() throws IOException;
public long getActiveSize() {
return getActive().getSize();
}
}

View File

@ -31,28 +31,26 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
/** /**
* A {@link java.util.Set} of {@link Cell}s implemented on top of a * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already
* {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a * exists in the set. The call to add returns true if no value in the backing map or false if
* {@link java.util.concurrent.ConcurrentSkipListSet} in all but one regard: * there was an entry with same key (though value may be different).
* An add will overwrite if already an entry for the added key. In other words, * implementation is tolerant of concurrent get and set and won't throw
* where CSLS does "Adds the specified element to this set if it is not already * ConcurrentModificationException when iterating.
* present.", this implementation "Adds the specified element to this set EVEN
* if it is already present overwriting what was there previous". The call to
* add returns true if no value in the backing map or false if there was an
* entry with same key (though value may be different).
* <p>Otherwise,
* has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
* get and set and won't throw ConcurrentModificationException when iterating.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class CellSkipListSet implements NavigableSet<Cell> { public class CellSet implements NavigableSet<Cell> {
// Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap}
// Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it
// is not already present.", this implementation "Adds the specified element to this set EVEN
// if it is already present overwriting what was there previous".
// Otherwise, has same attributes as ConcurrentSkipListSet
private final ConcurrentNavigableMap<Cell, Cell> delegatee; private final ConcurrentNavigableMap<Cell, Cell> delegatee;
CellSkipListSet(final CellComparator c) { CellSet(final CellComparator c) {
this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c); this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c);
} }
CellSkipListSet(final ConcurrentNavigableMap<Cell, Cell> m) { CellSet(final ConcurrentNavigableMap<Cell, Cell> m) {
this.delegatee = m; this.delegatee = m;
} }
@ -78,7 +76,7 @@ public class CellSkipListSet implements NavigableSet<Cell> {
public NavigableSet<Cell> headSet(final Cell toElement, public NavigableSet<Cell> headSet(final Cell toElement,
boolean inclusive) { boolean inclusive) {
return new CellSkipListSet(this.delegatee.headMap(toElement, inclusive)); return new CellSet(this.delegatee.headMap(toElement, inclusive));
} }
public Cell higher(Cell e) { public Cell higher(Cell e) {
@ -115,7 +113,7 @@ public class CellSkipListSet implements NavigableSet<Cell> {
} }
public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) { public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
return new CellSkipListSet(this.delegatee.tailMap(fromElement, inclusive)); return new CellSet(this.delegatee.tailMap(fromElement, inclusive));
} }
public Comparator<? super Cell> comparator() { public Comparator<? super Cell> comparator() {

View File

@ -19,35 +19,22 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.htrace.Trace;
/** /**
* The MemStore holds in-memory modifications to the Store. Modifications * The MemStore holds in-memory modifications to the Store. Modifications
@ -66,40 +53,8 @@ import org.apache.htrace.Trace;
* in KV size. * in KV size.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DefaultMemStore implements MemStore { public class DefaultMemStore extends AbstractMemStore {
private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
private static final boolean USEMSLAB_DEFAULT = true;
static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
private Configuration conf;
// MemStore. Use a CellSkipListSet rather than SkipListSet because of the
// better semantics. The Map will overwrite if passed a key it already had
// whereas the Set will not add new Cell if key is same though value might be
// different. Value is not important -- just make sure always same
// reference passed.
volatile CellSkipListSet cellSet;
// Snapshot of memstore. Made for flusher.
volatile CellSkipListSet snapshot;
final CellComparator comparator;
// Used to track own heapSize
final AtomicLong size;
private volatile long snapshotSize;
// Used to track when to flush
volatile long timeOfOldestEdit = Long.MAX_VALUE;
TimeRangeTracker timeRangeTracker;
TimeRangeTracker snapshotTimeRangeTracker;
volatile MemStoreLAB allocator;
volatile MemStoreLAB snapshotAllocator;
volatile long snapshotId;
volatile boolean tagsPresent;
/** /**
* Default constructor. Used for tests. * Default constructor. Used for tests.
@ -112,183 +67,54 @@ public class DefaultMemStore implements MemStore {
* Constructor. * Constructor.
* @param c Comparator * @param c Comparator
*/ */
public DefaultMemStore(final Configuration conf, public DefaultMemStore(final Configuration conf, final CellComparator c) {
final CellComparator c) { super(conf, c);
this.conf = conf;
this.comparator = c;
this.cellSet = new CellSkipListSet(c);
this.snapshot = new CellSkipListSet(c);
timeRangeTracker = new TimeRangeTracker();
snapshotTimeRangeTracker = new TimeRangeTracker();
this.size = new AtomicLong(DEEP_OVERHEAD);
this.snapshotSize = 0;
if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class }, new Object[] { conf });
} else {
this.allocator = null;
}
} }
void dump() { void dump() {
for (Cell cell: this.cellSet) { super.dump(LOG);
LOG.info(cell);
}
for (Cell cell: this.snapshot) {
LOG.info(cell);
}
} }
/** /**
* Creates a snapshot of the current memstore. * Creates a snapshot of the current memstore.
* Snapshot must be cleared by call to {@link #clearSnapshot(long)} * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
* @param flushOpSeqId the sequence id that is attached to the flush operation in the wal
*/ */
@Override @Override
public MemStoreSnapshot snapshot() { public MemStoreSnapshot snapshot(long flushOpSeqId) {
// If snapshot currently has entries, then flusher failed or didn't call // If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning. // cleanup. Log a warning.
if (!this.snapshot.isEmpty()) { if (!getSnapshot().isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " + LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?"); "Doing nothing. Another ongoing flush or did we fail last attempt?");
} else { } else {
this.snapshotId = EnvironmentEdgeManager.currentTime(); this.snapshotId = EnvironmentEdgeManager.currentTime();
this.snapshotSize = keySize(); if (!getActive().isEmpty()) {
if (!this.cellSet.isEmpty()) { ImmutableSegment immutableSegment = SegmentFactory.instance().
this.snapshot = this.cellSet; createImmutableSegment(getConfiguration(), getActive());
this.cellSet = new CellSkipListSet(this.comparator); setSnapshot(immutableSegment);
this.snapshotTimeRangeTracker = this.timeRangeTracker; setSnapshotSize(keySize());
this.timeRangeTracker = new TimeRangeTracker(); resetCellSet();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
this.snapshotAllocator = this.allocator;
// Reset allocator so we get a fresh buffer for the new memstore
if (allocator != null) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class }, new Object[] { conf });
}
timeOfOldestEdit = Long.MAX_VALUE;
} }
} }
MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize, return new MemStoreSnapshot(this.snapshotId, getSnapshot());
this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
this.tagsPresent);
this.tagsPresent = false;
return memStoreSnapshot;
}
/**
* The passed snapshot was successfully persisted; it can be let go.
* @param id Id of the snapshot to clean out.
* @throws UnexpectedStateException
* @see #snapshot()
*/
@Override
public void clearSnapshot(long id) throws UnexpectedStateException {
MemStoreLAB tmpAllocator = null;
if (this.snapshotId != id) {
throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
+ id);
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
if (!this.snapshot.isEmpty()) {
this.snapshot = new CellSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
this.snapshotSize = 0;
this.snapshotId = -1;
if (this.snapshotAllocator != null) {
tmpAllocator = this.snapshotAllocator;
this.snapshotAllocator = null;
}
if (tmpAllocator != null) {
tmpAllocator.close();
}
} }
@Override @Override
public long getFlushableSize() { protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
return this.snapshotSize > 0 ? this.snapshotSize : keySize(); List<SegmentScanner> list = new ArrayList<SegmentScanner>(2);
list.add(0, getActive().getSegmentScanner(readPt));
list.add(1, getSnapshot().getSegmentScanner(readPt));
return list;
} }
@Override @Override
public long getSnapshotSize() { protected List<Segment> getListOfSegments() throws IOException {
return this.snapshotSize; List<Segment> list = new ArrayList<Segment>(2);
} list.add(0, getActive());
list.add(1, getSnapshot());
/** return list;
* Write an update
* @param cell
* @return approximate size of the passed Cell.
*/
@Override
public long add(Cell cell) {
Cell toAdd = maybeCloneWithAllocator(cell);
return internalAdd(toAdd);
}
@Override
public long timeOfOldestEdit() {
return timeOfOldestEdit;
}
private boolean addToCellSet(Cell e) {
boolean b = this.cellSet.add(e);
// In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
// When we use ACL CP or Visibility CP which deals with Tags during
// mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
// parse the byte[] to identify the tags length.
if(e.getTagsLength() > 0) {
tagsPresent = true;
}
setOldestEditTimeToNow();
return b;
}
private boolean removeFromCellSet(Cell e) {
boolean b = this.cellSet.remove(e);
setOldestEditTimeToNow();
return b;
}
void setOldestEditTimeToNow() {
if (timeOfOldestEdit == Long.MAX_VALUE) {
timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
}
}
/**
* Internal version of add() that doesn't clone Cells with the
* allocator, and doesn't take the lock.
*
* Callers should ensure they already have the read lock taken
*/
private long internalAdd(final Cell toAdd) {
long s = heapSizeChange(toAdd, addToCellSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s;
}
private Cell maybeCloneWithAllocator(Cell cell) {
if (allocator == null) {
return cell;
}
int len = KeyValueUtil.length(cell);
ByteRange alloc = allocator.allocateBytes(len);
if (alloc == null) {
// The allocation was too large, allocator decided
// not to do anything with it.
return cell;
}
assert alloc.getBytes() != null;
KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
newKv.setSequenceId(cell.getSequenceId());
return newKv;
} }
/** /**
@ -301,39 +127,8 @@ public class DefaultMemStore implements MemStore {
*/ */
@Override @Override
public void rollback(Cell cell) { public void rollback(Cell cell) {
// If the key is in the snapshot, delete it. We should not update rollbackInSnapshot(cell);
// this.size, because that tracks the size of only the memstore and rollbackInActive(cell);
// not the snapshot. The flush of this snapshot to disk has not
// yet started because Store.flush() waits for all rwcc transactions to
// commit before starting the flush to disk.
Cell found = this.snapshot.get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
this.snapshot.remove(cell);
long sz = heapSizeChange(cell, true);
this.snapshotSize -= sz;
}
// If the key is in the memstore, delete it. Update this.size.
found = this.cellSet.get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
removeFromCellSet(cell);
long s = heapSizeChange(cell, true);
this.size.addAndGet(-s);
}
}
/**
* Write a delete
* @param deleteCell
* @return approximate size of the passed key and value.
*/
@Override
public long delete(Cell deleteCell) {
long s = 0;
Cell toAdd = maybeCloneWithAllocator(deleteCell);
s += heapSizeChange(toAdd, addToCellSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s;
} }
/** /**
@ -342,606 +137,31 @@ public class DefaultMemStore implements MemStore {
* @return Next row or null if none found. * @return Next row or null if none found.
*/ */
Cell getNextRow(final Cell cell) { Cell getNextRow(final Cell cell) {
return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot)); return getLowest(
getNextRow(cell, getActive().getCellSet()),
getNextRow(cell, getSnapshot().getCellSet()));
} }
/* @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) {
* @param a
* @param b
* @return Return lowest of a or b or null if both a and b are null
*/
private Cell getLowest(final Cell a, final Cell b) {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return comparator.compareRows(a, b) <= 0? a: b;
}
/*
* @param key Find row that follows this one. If null, return first.
* @param map Set to look in for a row beyond <code>row</code>.
* @return Next row or null if none found. If one found, will be a new
* KeyValue -- can be destroyed by subsequent calls to this method.
*/
private Cell getNextRow(final Cell key,
final NavigableSet<Cell> set) {
Cell result = null;
SortedSet<Cell> tail = key == null? set: set.tailSet(key);
// Iterate until we fall into the next row; i.e. move off current row
for (Cell cell: tail) {
if (comparator.compareRows(cell, key) <= 0)
continue;
// Note: Not suppressing deletes or expired cells. Needs to be handled
// by higher up functions.
result = cell;
break;
}
return result;
} }
/** /**
* Only used by tests. TODO: Remove * @return Total memory occupied by this MemStore.
*
* Given the specs of a column, update it, first by inserting a new record,
* then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
* will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
* store will ensure that the insert/delete each are atomic. A scanner/reader will either
* get the new value, or the old value and all readers will eventually only see the new
* value after the old was removed.
*
* @param row
* @param family
* @param qualifier
* @param newValue
* @param now
* @return Timestamp
*/ */
@Override
public long updateColumnValue(byte[] row,
byte[] family,
byte[] qualifier,
long newValue,
long now) {
Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
// Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
if (!snSs.isEmpty()) {
Cell snc = snSs.first();
// is there a matching Cell in the snapshot?
if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
if (snc.getTimestamp() == now) {
// poop,
now += 1;
}
}
}
// logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
// But the timestamp should also be max(now, mostRecentTsInMemstore)
// so we cant add the new Cell w/o knowing what's there already, but we also
// want to take this chance to delete some cells. So two loops (sad)
SortedSet<Cell> ss = cellSet.tailSet(firstCell);
for (Cell cell : ss) {
// if this isnt the row we are interested in, then bail:
if (!CellUtil.matchingColumn(cell, family, qualifier)
|| !CellUtil.matchingRow(cell, firstCell)) {
break; // rows dont match, bail.
}
// if the qualifier matches and it's a put, just RM it out of the cellSet.
if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
now = cell.getTimestamp();
}
}
// create or update (upsert) a new Cell with
// 'now' and a 0 memstoreTS == immediately visible
List<Cell> cells = new ArrayList<Cell>(1);
cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
return upsert(cells, 1L);
}
/**
* Update or insert the specified KeyValues.
* <p>
* For each KeyValue, insert into MemStore. This will atomically upsert the
* value for that row/family/qualifier. If a KeyValue did already exist,
* it will then be removed.
* <p>
* This is called under row lock, so Get operations will still see updates
* atomically. Scans will only see each KeyValue update as atomic.
*
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @return change in memstore size
*/
@Override
public long upsert(Iterable<Cell> cells, long readpoint) {
long size = 0;
for (Cell cell : cells) {
size += upsert(cell, readpoint);
}
return size;
}
/**
* Inserts the specified KeyValue into MemStore and deletes any existing
* versions of the same row/family/qualifier as the specified KeyValue.
* <p>
* First, the specified KeyValue is inserted into the Memstore.
* <p>
* If there are any existing KeyValues in this MemStore with the same row,
* family, and qualifier, they are removed.
* <p>
* Callers must hold the read lock.
* @param readpoint Smallest outstanding readpoint; below which we can remove duplicate Cells.
* @return change in size of MemStore
*/
private long upsert(Cell cell, long readpoint) {
// Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
// hitting OOME - see TestMemStore.testUpsertMSLAB for a
// test that triggers the pathological case if we don't avoid MSLAB
// here.
long addedSize = internalAdd(cell);
// Get the Cells for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts
Cell firstCell = KeyValueUtil.createFirstOnRow(
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
SortedSet<Cell> ss = cellSet.tailSet(firstCell);
Iterator<Cell> it = ss.iterator();
// Versions visible to oldest scanner.
int versionsVisible = 0;
while ( it.hasNext() ) {
Cell cur = it.next();
if (cell == cur) {
// ignore the one just put in
continue;
}
// check that this is the row and column we are interested in, otherwise bail
if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
// only remove Puts that concurrent scanners cannot possibly see
if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
cur.getSequenceId() <= readpoint) {
if (versionsVisible >= 1) {
// if we get here we have seen at least one version visible to the oldest scanner,
// which means we can prove that no scanner will see this version
// false means there was a change, so give us the size.
long delta = heapSizeChange(cur, true);
addedSize -= delta;
this.size.addAndGet(-delta);
it.remove();
setOldestEditTimeToNow();
} else {
versionsVisible++;
}
}
} else {
// past the row or column, done
break;
}
}
return addedSize;
}
/**
* @return scanner on memstore and snapshot in this order.
*/
@Override
public List<KeyValueScanner> getScanners(long readPt) {
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
}
/**
* Check if this memstore may contain the required keys
* @param scan scan
* @param store holds reference to cf
* @param oldestUnexpiredTS
* @return False if the key definitely does not exist in this Memstore
*/
public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
byte[] cf = store.getFamily().getName();
TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
if (timeRange == null) {
timeRange = scan.getTimeRange();
}
return (timeRangeTracker.includesTimeRange(timeRange) ||
snapshotTimeRangeTracker.includesTimeRange(timeRange))
&& (Math.max(timeRangeTracker.getMaximumTimestamp(),
snapshotTimeRangeTracker.getMaximumTimestamp()) >=
oldestUnexpiredTS);
}
/*
* MemStoreScanner implements the KeyValueScanner.
* It lets the caller scan the contents of a memstore -- both current
* map and snapshot.
* This behaves as if it were a real scanner but does not maintain position.
*/
protected class MemStoreScanner extends NonLazyKeyValueScanner {
// Next row information for either cellSet or snapshot
private Cell cellSetNextRow = null;
private Cell snapshotNextRow = null;
// last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
private Cell cellSetItRow = null;
private Cell snapshotItRow = null;
// iterator based scanning.
private Iterator<Cell> cellSetIt;
private Iterator<Cell> snapshotIt;
// The cellSet and snapshot at the time of creating this scanner
private CellSkipListSet cellSetAtCreation;
private CellSkipListSet snapshotAtCreation;
// the pre-calculated Cell to be returned by peek() or next()
private Cell theNext;
// The allocator and snapshot allocator at the time of creating this scanner
volatile MemStoreLAB allocatorAtCreation;
volatile MemStoreLAB snapshotAllocatorAtCreation;
// A flag represents whether could stop skipping Cells for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingCellsIfNextRow = false;
private long readPoint;
/*
Some notes...
So memstorescanner is fixed at creation time. this includes pointers/iterators into
existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
snapshot is moved. since kvset is null there is no point on reseeking on both,
we can save us the trouble. During the snapshot->hfile transition, the memstore
scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
potentially do something smarter by adjusting the existing memstore scanner.
But there is a greater problem here, that being once a scanner has progressed
during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
if a scan lasts a little while, there is a chance for new entries in kvset to
become available but we will never see them. This needs to be handled at the
StoreScanner level with coordination with MemStoreScanner.
Currently, this problem is only partly managed: during the small amount of time
when the StoreScanner has not yet created a new MemStoreScanner, we will miss
the adds to kvset in the MemStoreScanner.
*/
MemStoreScanner(long readPoint) {
super();
this.readPoint = readPoint;
cellSetAtCreation = cellSet;
snapshotAtCreation = snapshot;
if (allocator != null) {
this.allocatorAtCreation = allocator;
this.allocatorAtCreation.incScannerCount();
}
if (snapshotAllocator != null) {
this.snapshotAllocatorAtCreation = snapshotAllocator;
this.snapshotAllocatorAtCreation.incScannerCount();
}
if (Trace.isTracing() && Trace.currentSpan() != null) {
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
}
}
/**
* Lock on 'this' must be held by caller.
* @param it
* @return Next Cell
*/
private Cell getNext(Iterator<Cell> it) {
Cell startCell = theNext;
Cell v = null;
try {
while (it.hasNext()) {
v = it.next();
if (v.getSequenceId() <= this.readPoint) {
return v;
}
if (stopSkippingCellsIfNextRow && startCell != null
&& comparator.compareRows(v, startCell) > 0) {
return null;
}
}
return null;
} finally {
if (v != null) {
// in all cases, remember the last Cell iterated to
if (it == snapshotIt) {
snapshotItRow = v;
} else {
cellSetItRow = v;
}
}
}
}
/**
* Set the scanner at the seek key.
* Must be called only once: there is no thread safety between the scanner
* and the memStore.
* @param key seek value
* @return false if the key is null or if there is no data
*/
@Override
public synchronized boolean seek(Cell key) {
if (key == null) {
close();
return false;
}
// kvset and snapshot will never be null.
// if tailSet can't find anything, SortedSet is empty (not null).
cellSetIt = cellSetAtCreation.tailSet(key).iterator();
snapshotIt = snapshotAtCreation.tailSet(key).iterator();
cellSetItRow = null;
snapshotItRow = null;
return seekInSubLists(key);
}
/**
* (Re)initialize the iterators after a seek or a reseek.
*/
private synchronized boolean seekInSubLists(Cell key){
cellSetNextRow = getNext(cellSetIt);
snapshotNextRow = getNext(snapshotIt);
// Calculate the next value
theNext = getLowest(cellSetNextRow, snapshotNextRow);
// has data
return (theNext != null);
}
/**
* Move forward on the sub-lists set previously by seek.
* @param key seek value (should be non-null)
* @return true if there is at least one KV to read, false otherwise
*/
@Override
public synchronized boolean reseek(Cell key) {
/*
See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
This code is executed concurrently with flush and puts, without locks.
Two points must be known when working on this code:
1) It's not possible to use the 'kvTail' and 'snapshot'
variables, as they are modified during a flush.
2) The ideal implementation for performance would use the sub skip list
implicitly pointed by the iterators 'kvsetIt' and
'snapshotIt'. Unfortunately the Java API does not offer a method to
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
return seekInSubLists(key);
}
@Override
public synchronized Cell peek() {
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
return theNext;
}
@Override
public synchronized Cell next() {
if (theNext == null) {
return null;
}
final Cell ret = theNext;
// Advance one of the iterators
if (theNext == cellSetNextRow) {
cellSetNextRow = getNext(cellSetIt);
} else {
snapshotNextRow = getNext(snapshotIt);
}
// Calculate the next value
theNext = getLowest(cellSetNextRow, snapshotNextRow);
//long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
//DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
// getLowest() + " threadpoint=" + readpoint);
return ret;
}
/*
* Returns the lower of the two key values, or null if they are both null.
* This uses comparator.compare() to compare the KeyValue using the memstore
* comparator.
*/
private Cell getLowest(Cell first, Cell second) {
if (first == null && second == null) {
return null;
}
if (first != null && second != null) {
int compare = comparator.compare(first, second);
return (compare <= 0 ? first : second);
}
return (first != null ? first : second);
}
/*
* Returns the higher of the two cells, or null if they are both null.
* This uses comparator.compare() to compare the Cell using the memstore
* comparator.
*/
private Cell getHighest(Cell first, Cell second) {
if (first == null && second == null) {
return null;
}
if (first != null && second != null) {
int compare = comparator.compare(first, second);
return (compare > 0 ? first : second);
}
return (first != null ? first : second);
}
public synchronized void close() {
this.cellSetNextRow = null;
this.snapshotNextRow = null;
this.cellSetIt = null;
this.snapshotIt = null;
if (allocatorAtCreation != null) {
this.allocatorAtCreation.decScannerCount();
this.allocatorAtCreation = null;
}
if (snapshotAllocatorAtCreation != null) {
this.snapshotAllocatorAtCreation.decScannerCount();
this.snapshotAllocatorAtCreation = null;
}
this.cellSetItRow = null;
this.snapshotItRow = null;
}
/**
* MemStoreScanner returns max value as sequence id because it will
* always have the latest data among all files.
*/
@Override
public long getSequenceID() {
return Long.MAX_VALUE;
}
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return shouldSeek(scan, store, oldestUnexpiredTS);
}
/**
* Seek scanner to the given key first. If it returns false(means
* peek()==null) or scanner's peek row is bigger than row of given key, seek
* the scanner to the previous row of given key
*/
@Override
public synchronized boolean backwardSeek(Cell key) {
seek(key);
if (peek() == null || comparator.compareRows(peek(), key) > 0) {
return seekToPreviousRow(key);
}
return true;
}
/**
* Separately get the KeyValue before the specified key from kvset and
* snapshotset, and use the row of higher one as the previous row of
* specified key, then seek to the first KeyValue of previous row
*/
@Override
public synchronized boolean seekToPreviousRow(Cell originalKey) {
boolean keepSeeking = false;
Cell key = originalKey;
do {
Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
SortedSet<Cell> snapshotHead = snapshotAtCreation
.headSet(firstKeyOnRow);
Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
.last();
Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
if (lastCellBeforeRow == null) {
theNext = null;
return false;
}
Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
this.stopSkippingCellsIfNextRow = true;
seek(firstKeyOnPreviousRow);
this.stopSkippingCellsIfNextRow = false;
if (peek() == null
|| comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
keepSeeking = true;
key = firstKeyOnPreviousRow;
continue;
} else {
keepSeeking = false;
}
} while (keepSeeking);
return true;
}
@Override
public synchronized boolean seekToLastRow() {
Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
.last();
Cell second = snapshotAtCreation.isEmpty() ? null
: snapshotAtCreation.last();
Cell higherCell = getHighest(first, second);
if (higherCell == null) {
return false;
}
Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
if (seek(firstCellOnLastRow)) {
return true;
} else {
return seekToPreviousRow(higherCell);
}
}
}
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
(2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
/*
* Calculate how the MemStore size has changed. Includes overhead of the
* backing Map.
* @param cell
* @param notpresent True if the cell was NOT present in the set.
* @return Size
*/
static long heapSizeChange(final Cell cell, final boolean notpresent) {
return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
+ CellUtil.estimatedHeapSizeOf(cell)) : 0;
}
private long keySize() {
return heapSize() - DEEP_OVERHEAD;
}
/**
* Get the entire heap usage for this MemStore not including keys in the
* snapshot.
*/
@Override
public long heapSize() {
return size.get();
}
@Override @Override
public long size() { public long size() {
return heapSize(); return heapSize();
} }
/**
* Check whether anything need to be done based on the current active set size
* Nothing need to be done for the DefaultMemStore
*/
@Override
protected void checkActiveSize() {
return;
}
/** /**
* Code to help figure if our approximation of object heap sizes is close * Code to help figure if our approximation of object heap sizes is close
* enough. See hbase-900. Fills memstores then waits so user can heap * enough. See hbase-900. Fills memstores then waits so user can heap
@ -978,9 +198,6 @@ public class DefaultMemStore implements MemStore {
LOG.info("memstore2 estimated size=" + size); LOG.info("memstore2 estimated size=" + size);
final int seconds = 30; final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
for (int i = 0; i < seconds; i++) {
// Thread.sleep(1000);
}
LOG.info("Exiting."); LOG.info("Exiting.");
} }

View File

@ -18,6 +18,13 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -91,13 +98,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/** /**
* A Store holds a column family in a Region. Its a memstore and a set of zero * A Store holds a column family in a Region. Its a memstore and a set of zero
* or more StoreFiles, which stretch backwards over time. * or more StoreFiles, which stretch backwards over time.
@ -1636,7 +1636,7 @@ public class HStore implements Store {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
+ (request.isAllFiles() ? " (all files)" : "")); + (request.isAllFiles() ? " (all files)" : ""));
this.region.reportCompactionRequestStart(request.isMajor()); this.region.reportCompactionRequestStart(request.isMajor());
@ -1990,8 +1990,6 @@ public class HStore implements Store {
} }
/** /**
* Used in tests. TODO: Remove
*
* Updates the value for the given row/family/qualifier. This function will always be seen as * Updates the value for the given row/family/qualifier. This function will always be seen as
* atomic by other readers because it only puts a single KV to memstore. Thus no read/write * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
* control necessary. * control necessary.
@ -2002,6 +2000,7 @@ public class HStore implements Store {
* @return memstore size delta * @return memstore size delta
* @throws IOException * @throws IOException
*/ */
@VisibleForTesting
public long updateColumnValue(byte [] row, byte [] f, public long updateColumnValue(byte [] row, byte [] f,
byte [] qualifier, long newValue) byte [] qualifier, long newValue)
throws IOException { throws IOException {
@ -2055,7 +2054,8 @@ public class HStore implements Store {
*/ */
@Override @Override
public void prepare() { public void prepare() {
this.snapshot = memstore.snapshot(); // passing the current sequence number of the wal - to allow bookkeeping in the memstore
this.snapshot = memstore.snapshot(cacheFlushSeqNum);
this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushCount = snapshot.getCellsCount();
this.cacheFlushSize = snapshot.getSize(); this.cacheFlushSize = snapshot.getSize();
committedFiles = new ArrayList<Path>(1); committedFiles = new ArrayList<Path>(1);

View File

@ -0,0 +1,72 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
* and is not needed for a {@link MutableSegment}. Specifically, the method
* {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the
* {@link MemStoreSnapshot} object.
* In addition, this class overrides methods that are not likely to be supported by an immutable
* segment, e.g. {@link Segment#rollback(Cell)} and {@link Segment#getCellSet()}, which
* can be very inefficient.
*/
@InterfaceAudience.Private
public abstract class ImmutableSegment extends Segment {
public ImmutableSegment(Segment segment) {
super(segment);
}
/**
* Removes the given cell from this segment.
* By default immutable store segment can not rollback
* It may be invoked by tests in specific cases where it is known to be supported {@link
* ImmutableSegmentAdapter}
*/
@Override
public long rollback(Cell cell) {
return 0;
}
/**
* Returns a set of all the cells in the segment.
* The implementation of this method might be very inefficient for some immutable segments
* that do not maintain a cell set. Therefore by default this method is not supported.
* It may be invoked by tests in specific cases where it is known to be supported {@link
* ImmutableSegmentAdapter}
*/
@Override
public CellSet getCellSet() {
throw new NotImplementedException("Immutable Segment does not support this operation by " +
"default");
}
/**
* Builds a special scanner for the MemStoreSnapshot object that may be different than the
* general segment scanner.
* @return a special scanner for the MemStoreSnapshot object
*/
public abstract KeyValueScanner getKeyValueScanner();
}

View File

@ -0,0 +1,107 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* This segment is adapting a mutable segment making it into an immutable segment.
* This is used when a mutable segment is moved to being a snapshot or pushed into a compaction
* pipeline, that consists only of immutable segments.
* The compaction may generate different type of immutable segment
*/
@InterfaceAudience.Private
public class ImmutableSegmentAdapter extends ImmutableSegment {
final private MutableSegment adaptee;
public ImmutableSegmentAdapter(MutableSegment segment) {
super(segment);
this.adaptee = segment;
}
@Override
public KeyValueScanner getKeyValueScanner() {
return new CollectionBackedScanner(adaptee.getCellSet(), adaptee.getComparator());
}
@Override
public SegmentScanner getSegmentScanner(long readPoint) {
return adaptee.getSegmentScanner(readPoint);
}
@Override
public boolean isEmpty() {
return adaptee.isEmpty();
}
@Override
public int getCellsCount() {
return adaptee.getCellsCount();
}
@Override
public long add(Cell cell) {
return adaptee.add(cell);
}
@Override
public Cell getFirstAfter(Cell cell) {
return adaptee.getFirstAfter(cell);
}
@Override
public void close() {
adaptee.close();
}
@Override
public Cell maybeCloneWithAllocator(Cell cell) {
return adaptee.maybeCloneWithAllocator(cell);
}
@Override
public Segment setSize(long size) {
adaptee.setSize(size);
return this;
}
@Override
public long getSize() {
return adaptee.getSize();
}
@Override
public long rollback(Cell cell) {
return adaptee.rollback(cell);
}
@Override
public CellSet getCellSet() {
return adaptee.getCellSet();
}
@Override
public void dump(Log log) {
adaptee.dump(log);
}
}

View File

@ -17,10 +17,11 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
/** /**
@ -40,11 +41,20 @@ public interface MemStore extends HeapSize {
*/ */
MemStoreSnapshot snapshot(); MemStoreSnapshot snapshot();
/**
* Creates a snapshot of the current memstore. Snapshot must be cleared by call to
* {@link #clearSnapshot(long)}.
* @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed
* segment
* @return {@link MemStoreSnapshot}
*/
MemStoreSnapshot snapshot(long flushOpSeqId);
/** /**
* Clears the current snapshot of the Memstore. * Clears the current snapshot of the Memstore.
* @param id * @param id
* @throws UnexpectedStateException * @throws UnexpectedStateException
* @see #snapshot() * @see #snapshot(long)
*/ */
void clearSnapshot(long id) throws UnexpectedStateException; void clearSnapshot(long id) throws UnexpectedStateException;
@ -128,7 +138,7 @@ public interface MemStore extends HeapSize {
* @return scanner over the memstore. This might include scanner over the snapshot when one is * @return scanner over the memstore. This might include scanner over the snapshot when one is
* present. * present.
*/ */
List<KeyValueScanner> getScanners(long readPt); List<KeyValueScanner> getScanners(long readPt) throws IOException;
/** /**
* @return Total memory occupied by this MemStore. * @return Total memory occupied by this MemStore.

View File

@ -0,0 +1,348 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.htrace.Trace;
/**
* This is the scanner for any MemStore implementation, derived from MemStore.
* The MemStoreScanner combines SegmentScanner from different Segments and
* uses the key-value heap and the reversed key-value heap for the aggregated key-values set.
* It is assumed that only traversing forward or backward is used (without zigzagging in between)
*/
@InterfaceAudience.Private
public class MemStoreScanner extends NonLazyKeyValueScanner {
/**
* Types of cell MemStoreScanner
*/
static public enum Type {
UNDEFINED,
COMPACT_FORWARD,
USER_SCAN_FORWARD,
USER_SCAN_BACKWARD
}
// heap of scanners used for traversing forward
private KeyValueHeap forwardHeap;
// reversed scanners heap for traversing backward
private ReversedKeyValueHeap backwardHeap;
// The type of the scan is defined by constructor
// or according to the first usage
private Type type = Type.UNDEFINED;
private long readPoint;
// remember the initial version of the scanners list
List<SegmentScanner> scanners;
// pointer back to the relevant MemStore
// is needed for shouldSeek() method
private AbstractMemStore backwardReferenceToMemStore;
/**
* Constructor.
* If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
* After constructor only one heap is going to be initialized for entire lifespan
* of the MemStoreScanner. A specific scanner can only be one directional!
*
* @param ms Pointer back to the MemStore
* @param readPoint Read point below which we can safely remove duplicate KVs
* @param type The scan type COMPACT_FORWARD should be used for compaction
*/
public MemStoreScanner(AbstractMemStore ms, long readPoint, Type type) throws IOException {
this(ms, ms.getListOfScanners(readPoint), readPoint, type);
}
/* Constructor used only when the scan usage is unknown
and need to be defined according to the first move */
public MemStoreScanner(AbstractMemStore ms, long readPt) throws IOException {
this(ms, readPt, Type.UNDEFINED);
}
public MemStoreScanner(AbstractMemStore ms, List<SegmentScanner> scanners, long readPoint,
Type type) throws IOException {
super();
this.readPoint = readPoint;
this.type = type;
switch (type) {
case UNDEFINED:
case USER_SCAN_FORWARD:
case COMPACT_FORWARD:
this.forwardHeap = new KeyValueHeap(scanners, ms.getComparator());
break;
case USER_SCAN_BACKWARD:
this.backwardHeap = new ReversedKeyValueHeap(scanners, ms.getComparator());
break;
default:
throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner");
}
this.backwardReferenceToMemStore = ms;
this.scanners = scanners;
if (Trace.isTracing() && Trace.currentSpan() != null) {
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
}
}
/**
* Returns the cell from the top-most scanner without advancing the iterator.
* The backward traversal is assumed, only if specified explicitly
*/
@Override
public synchronized Cell peek() {
if (type == Type.USER_SCAN_BACKWARD) {
return backwardHeap.peek();
}
return forwardHeap.peek();
}
/**
* Gets the next cell from the top-most scanner. Assumed forward scanning.
*/
@Override
public synchronized Cell next() throws IOException {
KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap;
// loop over till the next suitable value
// take next value from the heap
for (Cell currentCell = heap.next();
currentCell != null;
currentCell = heap.next()) {
// all the logic of presenting cells is inside the internal SegmentScanners
// located inside the heap
return currentCell;
}
return null;
}
/**
* Set the scanner at the seek key. Assumed forward scanning.
* Must be called only once: there is no thread safety between the scanner
* and the memStore.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public synchronized boolean seek(Cell cell) throws IOException {
assertForward();
if (cell == null) {
close();
return false;
}
return forwardHeap.seek(cell);
}
/**
* Move forward on the sub-lists set previously by seek. Assumed forward scanning.
*
* @param cell seek value (should be non-null)
* @return true if there is at least one KV to read, false otherwise
*/
@Override
public synchronized boolean reseek(Cell cell) throws IOException {
/*
* See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
* This code is executed concurrently with flush and puts, without locks.
* Two points must be known when working on this code:
* 1) It's not possible to use the 'kvTail' and 'snapshot'
* variables, as they are modified during a flush.
* 2) The ideal implementation for performance would use the sub skip list
* implicitly pointed by the iterators 'kvsetIt' and
* 'snapshotIt'. Unfortunately the Java API does not offer a method to
* get it. So we remember the last keys we iterated to and restore
* the reseeked set to at least that point.
*
* TODO: The above comment copied from the original MemStoreScanner
*/
assertForward();
return forwardHeap.reseek(cell);
}
/**
* MemStoreScanner returns max value as sequence id because it will
* always have the latest data among all files.
*/
@Override
public synchronized long getSequenceID() {
return Long.MAX_VALUE;
}
@Override
public synchronized void close() {
if (forwardHeap != null) {
assert ((type == Type.USER_SCAN_FORWARD) ||
(type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED));
forwardHeap.close();
forwardHeap = null;
if (backwardHeap != null) {
backwardHeap.close();
backwardHeap = null;
}
} else if (backwardHeap != null) {
assert (type == Type.USER_SCAN_BACKWARD);
backwardHeap.close();
backwardHeap = null;
}
}
/**
* Set the scanner at the seek key. Assumed backward scanning.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public synchronized boolean backwardSeek(Cell cell) throws IOException {
initBackwardHeapIfNeeded(cell, false);
return backwardHeap.backwardSeek(cell);
}
/**
* Assumed backward scanning.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public synchronized boolean seekToPreviousRow(Cell cell) throws IOException {
initBackwardHeapIfNeeded(cell, false);
if (backwardHeap.peek() == null) {
restartBackwardHeap(cell);
}
return backwardHeap.seekToPreviousRow(cell);
}
@Override
public synchronized boolean seekToLastRow() throws IOException {
// TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't
// implement seekToLastRow() method :(
// however seekToLastRow() was implemented in internal MemStoreScanner
// so I wonder whether we need to come with our own workaround, or to update
// ReversedKeyValueHeap
return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true);
}
/**
* Check if this memstore may contain the required keys
* @return False if the key definitely does not exist in this Memstore
*/
@Override
public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
if (type == Type.COMPACT_FORWARD) {
return true;
}
for (SegmentScanner sc : scanners) {
if (sc.shouldSeek(scan, oldestUnexpiredTS)) {
return true;
}
}
return false;
}
// debug method
@Override
public String toString() {
StringBuffer buf = new StringBuffer();
int i = 1;
for (SegmentScanner scanner : scanners) {
buf.append("scanner (" + i + ") " + scanner.toString() + " ||| ");
i++;
}
return buf.toString();
}
/****************** Private methods ******************/
/**
* Restructure the ended backward heap after rerunning a seekToPreviousRow()
* on each scanner
* @return false if given Cell does not exist in any scanner
*/
private boolean restartBackwardHeap(Cell cell) throws IOException {
boolean res = false;
for (SegmentScanner scan : scanners) {
res |= scan.seekToPreviousRow(cell);
}
this.backwardHeap =
new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator());
return res;
}
/**
* Checks whether the type of the scan suits the assumption of moving backward
*/
private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException {
boolean res = false;
if (toLast && (type != Type.UNDEFINED)) {
throw new IllegalStateException(
"Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString());
}
if (type == Type.UNDEFINED) {
// In case we started from peek, release the forward heap
// and build backward. Set the correct type. Thus this turn
// can happen only once
if ((backwardHeap == null) && (forwardHeap != null)) {
forwardHeap.close();
forwardHeap = null;
// before building the heap seek for the relevant key on the scanners,
// for the heap to be built from the scanners correctly
for (SegmentScanner scan : scanners) {
if (toLast) {
res |= scan.seekToLastRow();
} else {
res |= scan.backwardSeek(cell);
}
}
this.backwardHeap =
new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator());
type = Type.USER_SCAN_BACKWARD;
}
}
if (type == Type.USER_SCAN_FORWARD) {
throw new IllegalStateException("Traversing backward with forward scan");
}
return res;
}
/**
* Checks whether the type of the scan suits the assumption of moving forward
*/
private void assertForward() throws IllegalStateException {
if (type == Type.UNDEFINED) {
type = Type.USER_SCAN_FORWARD;
}
if (type == Type.USER_SCAN_BACKWARD) {
throw new IllegalStateException("Traversing forward with backward scan");
}
}
}

View File

@ -34,14 +34,13 @@ public class MemStoreSnapshot {
private final KeyValueScanner scanner; private final KeyValueScanner scanner;
private final boolean tagsPresent; private final boolean tagsPresent;
public MemStoreSnapshot(long id, int cellsCount, long size, TimeRangeTracker timeRangeTracker, public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
KeyValueScanner scanner, boolean tagsPresent) {
this.id = id; this.id = id;
this.cellsCount = cellsCount; this.cellsCount = snapshot.getCellsCount();
this.size = size; this.size = snapshot.getSize();
this.timeRangeTracker = timeRangeTracker; this.timeRangeTracker = snapshot.getTimeRangeTracker();
this.scanner = scanner; this.scanner = snapshot.getKeyValueScanner();
this.tagsPresent = tagsPresent; this.tagsPresent = snapshot.isTagsPresent();
} }
/** /**

View File

@ -0,0 +1,153 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This mutable store segment encapsulates a mutable cell set and its respective memory allocation
* buffers (MSLAB).
*/
@InterfaceAudience.Private
final class MutableCellSetSegment extends MutableSegment {
private volatile CellSet cellSet;
private final CellComparator comparator;
// Instantiate objects only using factory
MutableCellSetSegment(CellSet cellSet, MemStoreLAB memStoreLAB, long size,
CellComparator comparator) {
super(memStoreLAB, size);
this.cellSet = cellSet;
this.comparator = comparator;
}
@Override
public SegmentScanner getSegmentScanner(long readPoint) {
return new MutableCellSetSegmentScanner(this, readPoint);
}
@Override
public boolean isEmpty() {
return getCellSet().isEmpty();
}
@Override
public int getCellsCount() {
return getCellSet().size();
}
@Override
public long add(Cell cell) {
boolean succ = getCellSet().add(cell);
long s = AbstractMemStore.heapSizeChange(cell, succ);
updateMetaInfo(cell, s);
// In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
// When we use ACL CP or Visibility CP which deals with Tags during
// mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
// parse the byte[] to identify the tags length.
if(cell.getTagsLength() > 0) {
tagsPresent = true;
}
return s;
}
@Override
public long rollback(Cell cell) {
Cell found = get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
long sz = AbstractMemStore.heapSizeChange(cell, true);
remove(cell);
incSize(-sz);
return sz;
}
return 0;
}
@Override
public Cell getFirstAfter(Cell cell) {
SortedSet<Cell> snTailSet = tailSet(cell);
if (!snTailSet.isEmpty()) {
return snTailSet.first();
}
return null;
}
@Override
public void dump(Log log) {
for (Cell cell: getCellSet()) {
log.debug(cell);
}
}
@Override
public SortedSet<Cell> tailSet(Cell firstCell) {
return getCellSet().tailSet(firstCell);
}
@Override
public CellSet getCellSet() {
return cellSet;
}
@Override
public CellComparator getComparator() {
return comparator;
}
//*** Methods for MemStoreSegmentsScanner
public Cell last() {
return getCellSet().last();
}
public Iterator<Cell> iterator() {
return getCellSet().iterator();
}
public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
return getCellSet().headSet(firstKeyOnRow);
}
public int compare(Cell left, Cell right) {
return getComparator().compare(left, right);
}
public int compareRows(Cell left, Cell right) {
return getComparator().compareRows(left, right);
}
private Cell get(Cell cell) {
return getCellSet().get(cell);
}
private boolean remove(Cell e) {
return getCellSet().remove(e);
}
// methods for tests
@Override
Cell first() {
return this.getCellSet().first();
}
}

View File

@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A scanner of a single cells segment {@link MutableCellSetSegment}.
*/
@InterfaceAudience.Private
class MutableCellSetSegmentScanner extends SegmentScanner {
// the observed structure
private final MutableCellSetSegment segment;
// the highest relevant MVCC
private long readPoint;
// the current iterator that can be reinitialized by
// seek(), backwardSeek(), or reseek()
private Iterator<Cell> iter;
// the pre-calculated cell to be returned by peek()
private Cell current = null;
// or next()
// A flag represents whether could stop skipping KeyValues for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingKVsIfNextRow = false;
// last iterated KVs by seek (to restore the iterator state after reseek)
private Cell last = null;
public MutableCellSetSegmentScanner(MutableCellSetSegment segment, long readPoint) {
super();
this.segment = segment;
this.readPoint = readPoint;
iter = segment.iterator();
// the initialization of the current is required for working with heap of SegmentScanners
current = getNext();
//increase the reference count so the underlying structure will not be de-allocated
this.segment.incScannerCount();
}
/**
* Look at the next Cell in this scanner, but do not iterate the scanner
* @return the currently observed Cell
*/
@Override
public Cell peek() { // sanity check, the current should be always valid
if (current!=null && current.getSequenceId() > readPoint) {
throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
"while current sequence id is " +current.getSequenceId());
}
return current;
}
/**
* Return the next Cell in this scanner, iterating the scanner
* @return the next Cell or null if end of scanner
*/
@Override
public Cell next() throws IOException {
Cell oldCurrent = current;
current = getNext(); // update the currently observed Cell
return oldCurrent;
}
/**
* Seek the scanner at or after the specified Cell.
* @param cell seek value
* @return true if scanner has values left, false if end of scanner
*/
@Override
public boolean seek(Cell cell) throws IOException {
if(cell == null) {
close();
return false;
}
// restart the iterator from new key
iter = segment.tailSet(cell).iterator();
// last is going to be reinitialized in the next getNext() call
last = null;
current = getNext();
return (current != null);
}
/**
* Reseek the scanner at or after the specified KeyValue.
* This method is guaranteed to seek at or after the required key only if the
* key comes after the current position of the scanner. Should not be used
* to seek to a key which may come before the current position.
*
* @param cell seek value (should be non-null)
* @return true if scanner has values left, false if end of scanner
*/
@Override
public boolean reseek(Cell cell) throws IOException {
/*
See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
This code is executed concurrently with flush and puts, without locks.
The ideal implementation for performance would use the sub skip list implicitly
pointed by the iterator. Unfortunately the Java API does not offer a method to
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
iter = segment.tailSet(getHighest(cell, last)).iterator();
current = getNext();
return (current != null);
}
/**
* Seek the scanner at or before the row of specified Cell, it firstly
* tries to seek the scanner at or after the specified Cell, return if
* peek KeyValue of scanner has the same row with specified Cell,
* otherwise seek the scanner at the first Cell of the row which is the
* previous row of specified KeyValue
*
* @param key seek Cell
* @return true if the scanner is at the valid KeyValue, false if such Cell does not exist
*/
@Override
public boolean backwardSeek(Cell key) throws IOException {
seek(key); // seek forward then go backward
if (peek() == null || segment.compareRows(peek(), key) > 0) {
return seekToPreviousRow(key);
}
return true;
}
/**
* Seek the scanner at the first Cell of the row which is the previous row
* of specified key
*
* @param cell seek value
* @return true if the scanner at the first valid Cell of previous row,
* false if not existing such Cell
*/
@Override
public boolean seekToPreviousRow(Cell cell) throws IOException {
boolean keepSeeking = false;
Cell key = cell;
do {
Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
SortedSet<Cell> cellHead = segment.headSet(firstKeyOnRow);
Cell lastCellBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
if (lastCellBeforeRow == null) {
current = null;
return false;
}
Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
this.stopSkippingKVsIfNextRow = true;
seek(firstKeyOnPreviousRow);
this.stopSkippingKVsIfNextRow = false;
if (peek() == null
|| segment.getComparator().compareRows(peek(), firstKeyOnPreviousRow) > 0) {
keepSeeking = true;
key = firstKeyOnPreviousRow;
continue;
} else {
keepSeeking = false;
}
} while (keepSeeking);
return true;
}
/**
* Seek the scanner at the first KeyValue of last row
*
* @return true if scanner has values left, false if the underlying data is empty
*/
@Override
public boolean seekToLastRow() throws IOException {
Cell higherCell = segment.isEmpty() ? null : segment.last();
if (higherCell == null) {
return false;
}
Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
if (seek(firstCellOnLastRow)) {
return true;
} else {
return seekToPreviousRow(higherCell);
}
}
@Override protected Segment getSegment() {
return segment;
}
/********************* Private Methods **********************/
/**
* Private internal method for iterating over the segment,
* skipping the cells with irrelevant MVCC
*/
private Cell getNext() {
Cell startKV = current;
Cell next = null;
try {
while (iter.hasNext()) {
next = iter.next();
if (next.getSequenceId() <= this.readPoint) {
return next; // skip irrelevant versions
}
if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the
startKV != null && // boundaries of a single row
segment.compareRows(next, startKV) > 0) {
return null;
}
} // end of while
return null; // nothing found
} finally {
if (next != null) {
// in all cases, remember the last KV we iterated to, needed for reseek()
last = next;
}
}
}
/**
* Private internal method that returns the higher of the two key values, or null
* if they are both null
*/
private Cell getHighest(Cell first, Cell second) {
if (first == null && second == null) {
return null;
}
if (first != null && second != null) {
int compare = segment.compare(first, second);
return (compare > 0 ? first : second);
}
return (first != null ? first : second);
}
}

View File

@ -0,0 +1,57 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.SortedSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* An abstraction of a mutable segment in memstore, specifically the active segment.
*/
@InterfaceAudience.Private
public abstract class MutableSegment extends Segment {
protected MutableSegment(MemStoreLAB memStoreLAB, long size) {
super(memStoreLAB, size);
}
/**
* Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment
* @return a subset of the segment cell set, which starts with the given cell
*/
public abstract SortedSet<Cell> tailSet(Cell firstCell);
/**
* Returns the Cell comparator used by this segment
* @return the Cell comparator used by this segment
*/
public abstract CellComparator getComparator();
//methods for test
/**
* Returns the first cell in the segment
* @return the first cell in the segment
*/
abstract Cell first();
}

View File

@ -0,0 +1,218 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.ByteRange;
/**
* This is an abstraction of a segment maintained in a memstore, e.g., the active
* cell set or its snapshot.
*
* This abstraction facilitates the management of the compaction pipeline and the shifts of these
* segments from active set to snapshot set in the default implementation.
*/
@InterfaceAudience.Private
public abstract class Segment {
private volatile MemStoreLAB memStoreLAB;
private final AtomicLong size;
private final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent;
protected Segment(MemStoreLAB memStoreLAB, long size) {
this.memStoreLAB = memStoreLAB;
this.size = new AtomicLong(size);
this.timeRangeTracker = new TimeRangeTracker();
this.tagsPresent = false;
}
protected Segment(Segment segment) {
this.memStoreLAB = segment.getMemStoreLAB();
this.size = new AtomicLong(segment.getSize());
this.timeRangeTracker = segment.getTimeRangeTracker();
this.tagsPresent = segment.isTagsPresent();
}
/**
* Creates the scanner that is able to scan the concrete segment
* @return a scanner for the given read point
*/
public abstract SegmentScanner getSegmentScanner(long readPoint);
/**
* Returns whether the segment has any cells
* @return whether the segment has any cells
*/
public abstract boolean isEmpty();
/**
* Returns number of cells in segment
* @return number of cells in segment
*/
public abstract int getCellsCount();
/**
* Adds the given cell into the segment
* @return the change in the heap size
*/
public abstract long add(Cell cell);
/**
* Removes the given cell from the segment
* @return the change in the heap size
*/
public abstract long rollback(Cell cell);
/**
* Returns the first cell in the segment that has equal or greater key than the given cell
* @return the first cell in the segment that has equal or greater key than the given cell
*/
public abstract Cell getFirstAfter(Cell cell);
/**
* Returns a set of all cells in the segment
* @return a set of all cells in the segment
*/
public abstract CellSet getCellSet();
/**
* Closing a segment before it is being discarded
*/
public void close() {
MemStoreLAB mslab = getMemStoreLAB();
if(mslab != null) {
mslab.close();
}
// do not set MSLab to null as scanners may still be reading the data here and need to decrease
// the counter when they finish
}
/**
* If the segment has a memory allocator the cell is being cloned to this space, and returned;
* otherwise the given cell is returned
* @return either the given cell or its clone
*/
public Cell maybeCloneWithAllocator(Cell cell) {
if (getMemStoreLAB() == null) {
return cell;
}
int len = KeyValueUtil.length(cell);
ByteRange alloc = getMemStoreLAB().allocateBytes(len);
if (alloc == null) {
// The allocation was too large, allocator decided
// not to do anything with it.
return cell;
}
assert alloc.getBytes() != null;
KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
newKv.setSequenceId(cell.getSequenceId());
return newKv;
}
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
return (getTimeRangeTracker().includesTimeRange(scan.getTimeRange())
&& (getTimeRangeTracker().getMaximumTimestamp() >=
oldestUnexpiredTS));
}
public long getMinTimestamp() {
return getTimeRangeTracker().getMinimumTimestamp();
}
public boolean isTagsPresent() {
return tagsPresent;
}
public void incScannerCount() {
if(getMemStoreLAB() != null) {
getMemStoreLAB().incScannerCount();
}
}
public void decScannerCount() {
if(getMemStoreLAB() != null) {
getMemStoreLAB().decScannerCount();
}
}
/**
* Setting the heap size of the segment - used to account for different class overheads
* @return this object
*/
public Segment setSize(long size) {
this.size.set(size);
return this;
}
/**
* Returns the heap size of the segment
* @return the heap size of the segment
*/
public long getSize() {
return size.get();
}
/**
* Increases the heap size counter of the segment by the given delta
*/
public void incSize(long delta) {
size.addAndGet(delta);
}
public TimeRangeTracker getTimeRangeTracker() {
return timeRangeTracker;
}
protected void updateMetaInfo(Cell toAdd, long s) {
getTimeRangeTracker().includeTimestamp(toAdd);
size.addAndGet(s);
}
private MemStoreLAB getMemStoreLAB() {
return memStoreLAB;
}
// Debug methods
/**
* Dumps all cells of the segment into the given log
*/
public abstract void dump(Log log);
@Override
public String toString() {
String res = "Store segment of type "+this.getClass().getName()+"; ";
res += "isEmpty "+(isEmpty()?"yes":"no")+"; ";
res += "cellCount "+getCellsCount()+"; ";
res += "size "+getSize()+"; ";
res += "Min ts "+getMinTimestamp()+"; ";
return res;
}
}

View File

@ -0,0 +1,89 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* A singleton store segment factory.
* Generate concrete store segments.
*/
@InterfaceAudience.Private
public final class SegmentFactory {
static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
static final boolean USEMSLAB_DEFAULT = true;
static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
private SegmentFactory() {}
private static SegmentFactory instance = new SegmentFactory();
public static SegmentFactory instance() {
return instance;
}
public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, long size) {
MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size);
return createImmutableSegment(conf, segment);
}
public ImmutableSegment createImmutableSegment(CellComparator comparator,
long size) {
MutableSegment segment = generateMutableSegment(null, comparator, null, size);
return createImmutableSegment(null, segment);
}
public ImmutableSegment createImmutableSegment(final Configuration conf, MutableSegment segment) {
return generateImmutableSegment(conf, segment);
}
public MutableSegment createMutableSegment(final Configuration conf,
CellComparator comparator, long size) {
MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
return generateMutableSegment(conf, comparator, memStoreLAB, size);
}
//****** private methods to instantiate concrete store segments **********//
private ImmutableSegment generateImmutableSegment(final Configuration conf,
MutableSegment segment) {
// TBD use configuration to set type of segment
return new ImmutableSegmentAdapter(segment);
}
private MutableSegment generateMutableSegment(
final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) {
// TBD use configuration to set type of segment
CellSet set = new CellSet(comparator);
return new MutableCellSetSegment(set, memStoreLAB, size, comparator);
}
private MemStoreLAB getMemStoreLAB(Configuration conf) {
MemStoreLAB memStoreLAB = null;
if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class }, new Object[] { conf });
}
return memStoreLAB;
}
}

View File

@ -0,0 +1,152 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
/**
* An abstraction for store segment scanner.
*/
@InterfaceAudience.Private
public abstract class SegmentScanner implements KeyValueScanner {
private long sequenceID = Long.MAX_VALUE;
protected abstract Segment getSegment();
/**
* Get the sequence id associated with this KeyValueScanner. This is required
* for comparing multiple files (or memstore segments) scanners to find out
* which one has the latest data.
*
*/
@Override
public long getSequenceID() {
return sequenceID;
}
/**
* Close the KeyValue scanner.
*/
@Override
public void close() {
getSegment().decScannerCount();
}
/**
* This functionality should be resolved in the higher level which is
* MemStoreScanner, currently returns true as default. Doesn't throw
* IllegalStateException in order not to change the signature of the
* overridden method
*/
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return true;
}
/**
* This scanner is working solely on the in-memory MemStore therefore this
* interface is not relevant.
*/
@Override
public boolean requestSeek(Cell c, boolean forward, boolean useBloom)
throws IOException {
throw new IllegalStateException(
"requestSeek cannot be called on MutableCellSetSegmentScanner");
}
/**
* This scanner is working solely on the in-memory MemStore and doesn't work on
* store files, MutableCellSetSegmentScanner always does the seek,
* therefore always returning true.
*/
@Override
public boolean realSeekDone() {
return true;
}
/**
* This function should be never called on scanners that always do real seek operations (i.e. most
* of the scanners and also this one). The easiest way to achieve this is to call
* {@link #realSeekDone()} first.
*/
@Override
public void enforceSeek() throws IOException {
throw new IllegalStateException(
"enforceSeek cannot be called on MutableCellSetSegmentScanner");
}
/**
* @return true if this is a file scanner. Otherwise a memory scanner is assumed.
*/
@Override
public boolean isFileScanner() {
return false;
}
/**
* @return the next key in the index (the key to seek to the next block)
* if known, or null otherwise
* Not relevant for in-memory scanner
*/
@Override
public Cell getNextIndexedKey() {
return null;
}
/**
* Called after a batch of rows scanned (RPC) and set to be returned to client. Any in between
* cleanup can be done here. Nothing to be done for MutableCellSetSegmentScanner.
*/
@Override
public void shipped() throws IOException {
// do nothing
}
/**
* Set the sequence id of the scanner.
* This is used to determine an order between memory segment scanners.
* @param x a unique sequence id
*/
public void setSequenceID(long x) {
sequenceID = x;
}
/**
* Returns whether the given scan should seek in this segment
* @return whether the given scan should seek in this segment
*/
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
return getSegment().shouldSeek(scan,oldestUnexpiredTS);
}
//debug method
@Override
public String toString() {
String res = "Store segment scanner of type "+this.getClass().getName()+"; ";
res += "sequence id "+getSequenceID()+"; ";
res += getSegment().toString();
return res;
}
}

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
/** /**

View File

@ -19,6 +19,27 @@
package org.apache.hadoop.hbase.io; package org.apache.hadoop.hbase.io;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
import org.apache.hadoop.hbase.regionserver.CellSet;
import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
@ -35,27 +56,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.regionserver.CellSkipListSet;
import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
/** /**
@ -237,8 +237,8 @@ public class TestHeapSize {
assertEquals(expected, actual); assertEquals(expected, actual);
} }
// CellSkipListSet // CellSet
cl = CellSkipListSet.class; cl = CellSet.class;
expected = ClassSize.estimateBase(cl, false); expected = ClassSize.estimateBase(cl, false);
actual = ClassSize.CELL_SKIPLIST_SET; actual = ClassSize.CELL_SKIPLIST_SET;
if (expected != actual) { if (expected != actual) {
@ -305,15 +305,16 @@ public class TestHeapSize {
// DefaultMemStore Deep Overhead // DefaultMemStore Deep Overhead
actual = DefaultMemStore.DEEP_OVERHEAD; actual = DefaultMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false); expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false); expected += (2 * ClassSize.estimateBase(AtomicLong.class, false));
expected += (2 * ClassSize.estimateBase(CellSkipListSet.class, false)); expected += (2 * ClassSize.estimateBase(CellSet.class, false));
expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false)); expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false)); expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
if(expected != actual) { if(expected != actual) {
ClassSize.estimateBase(cl, true); ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(CellSkipListSet.class, true); ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(CellSkipListSet.class, true); ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true);

View File

@ -18,11 +18,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.SortedSet;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
@ -32,10 +28,13 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import java.util.Iterator;
import java.util.SortedSet;
@Category({RegionServerTests.class, SmallTests.class}) @Category({RegionServerTests.class, SmallTests.class})
public class TestCellSkipListSet extends TestCase { public class TestCellSkipListSet extends TestCase {
private final CellSkipListSet csls = private final CellSet csls =
new CellSkipListSet(CellComparator.COMPARATOR); new CellSet(CellComparator.COMPARATOR);
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
@ -163,4 +162,4 @@ public class TestCellSkipListSet extends TestCase {
assertTrue(Bytes.equals(head.first().getValueArray(), head.first().getValueOffset(), assertTrue(Bytes.equals(head.first().getValueArray(), head.first().getValueOffset(),
head.first().getValueLength(), value2, 0, value2.length)); head.first().getValueLength(), value2, 0, value2.length));
} }
} }

View File

@ -18,17 +18,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import com.google.common.base.Joiner;
import java.lang.management.ManagementFactory; import com.google.common.collect.Iterables;
import java.lang.management.MemoryMXBean; import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -57,12 +50,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import com.google.common.base.Joiner; import java.io.IOException;
import com.google.common.collect.Iterables; import java.lang.management.ManagementFactory;
import com.google.common.collect.Lists; import java.lang.management.MemoryMXBean;
import java.util.ArrayList;
import static org.mockito.Mockito.mock; import java.util.Arrays;
import static org.mockito.Mockito.when; import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/** memstore test case */ /** memstore test case */
@Category({RegionServerTests.class, MediumTests.class}) @Category({RegionServerTests.class, MediumTests.class})
@ -89,11 +84,9 @@ public class TestDefaultMemStore extends TestCase {
byte [] other = Bytes.toBytes("somethingelse"); byte [] other = Bytes.toBytes("somethingelse");
KeyValue samekey = new KeyValue(bytes, bytes, bytes, other); KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
this.memstore.add(samekey); this.memstore.add(samekey);
Cell found = this.memstore.cellSet.first(); Cell found = this.memstore.getActive().first();
assertEquals(1, this.memstore.cellSet.size()); assertEquals(1, this.memstore.getActive().getCellsCount());
assertTrue( assertTrue(Bytes.toString(found.getValueArray()), CellUtil.matchingValue(samekey, found));
Bytes.toString(found.getValueArray(), found.getValueOffset(), found.getValueLength()),
CellUtil.matchingValue(samekey, found));
} }
/** /**
@ -108,7 +101,7 @@ public class TestDefaultMemStore extends TestCase {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
ScanInfo scanInfo = ScanInfo scanInfo =
new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0, new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
this.memstore.comparator); this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN; ScanType scanType = ScanType.USER_SCAN;
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
int count = 0; int count = 0;
@ -476,7 +469,7 @@ public class TestDefaultMemStore extends TestCase {
for (int i = 0; i < snapshotCount; i++) { for (int i = 0; i < snapshotCount; i++) {
addRows(this.memstore); addRows(this.memstore);
runSnapshot(this.memstore); runSnapshot(this.memstore);
assertEquals("History not being cleared", 0, this.memstore.snapshot.size()); assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
} }
} }
@ -497,7 +490,7 @@ public class TestDefaultMemStore extends TestCase {
m.add(key2); m.add(key2);
assertTrue("Expected memstore to hold 3 values, actually has " + assertTrue("Expected memstore to hold 3 values, actually has " +
m.cellSet.size(), m.cellSet.size() == 3); m.getActive().getCellsCount(), m.getActive().getCellsCount() == 3);
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -529,7 +522,7 @@ public class TestDefaultMemStore extends TestCase {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
KeepDeletedCells.FALSE, 0, this.memstore.comparator); KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN; ScanType scanType = ScanType.USER_SCAN;
InternalScanner scanner = new StoreScanner(new Scan( InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null, Bytes.toBytes(startRowId)), scanInfo, scanType, null,
@ -570,12 +563,12 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(new KeyValue(row, fam ,qf3, val)); memstore.add(new KeyValue(row, fam ,qf3, val));
//Creating a snapshot //Creating a snapshot
memstore.snapshot(); memstore.snapshot();
assertEquals(3, memstore.snapshot.size()); assertEquals(3, memstore.getSnapshot().getCellsCount());
//Adding value to "new" memstore //Adding value to "new" memstore
assertEquals(0, memstore.cellSet.size()); assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam ,qf4, val)); memstore.add(new KeyValue(row, fam ,qf4, val));
memstore.add(new KeyValue(row, fam ,qf5, val)); memstore.add(new KeyValue(row, fam ,qf5, val));
assertEquals(2, memstore.cellSet.size()); assertEquals(2, memstore.getActive().getCellsCount());
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -597,7 +590,7 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(put2); memstore.add(put2);
memstore.add(put3); memstore.add(put3);
assertEquals(3, memstore.cellSet.size()); assertEquals(3, memstore.getActive().getCellsCount());
KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val); KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val);
memstore.delete(del2); memstore.delete(del2);
@ -608,9 +601,9 @@ public class TestDefaultMemStore extends TestCase {
expected.add(put2); expected.add(put2);
expected.add(put1); expected.add(put1);
assertEquals(4, memstore.cellSet.size()); assertEquals(4, memstore.getActive().getCellsCount());
int i = 0; int i = 0;
for(Cell cell : memstore.cellSet) { for(Cell cell : memstore.getActive().getCellSet()) {
assertEquals(expected.get(i++), cell); assertEquals(expected.get(i++), cell);
} }
} }
@ -631,7 +624,7 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(put2); memstore.add(put2);
memstore.add(put3); memstore.add(put3);
assertEquals(3, memstore.cellSet.size()); assertEquals(3, memstore.getActive().getCellsCount());
KeyValue del2 = KeyValue del2 =
new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val); new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val);
@ -644,9 +637,9 @@ public class TestDefaultMemStore extends TestCase {
expected.add(put1); expected.add(put1);
assertEquals(4, memstore.cellSet.size()); assertEquals(4, memstore.getActive().getCellsCount());
int i = 0; int i = 0;
for (Cell cell: memstore.cellSet) { for (Cell cell: memstore.getActive().getCellSet()) {
assertEquals(expected.get(i++), cell); assertEquals(expected.get(i++), cell);
} }
} }
@ -684,9 +677,9 @@ public class TestDefaultMemStore extends TestCase {
assertEquals(5, memstore.cellSet.size()); assertEquals(5, memstore.getActive().getCellsCount());
int i = 0; int i = 0;
for (Cell cell: memstore.cellSet) { for (Cell cell: memstore.getActive().getCellSet()) {
assertEquals(expected.get(i++), cell); assertEquals(expected.get(i++), cell);
} }
} }
@ -700,8 +693,8 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(new KeyValue(row, fam, qf, ts, val)); memstore.add(new KeyValue(row, fam, qf, ts, val));
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memstore.delete(delete); memstore.delete(delete);
assertEquals(2, memstore.cellSet.size()); assertEquals(2, memstore.getActive().getCellsCount());
assertEquals(delete, memstore.cellSet.first()); assertEquals(delete, memstore.getActive().first());
} }
public void testRetainsDeleteVersion() throws IOException { public void testRetainsDeleteVersion() throws IOException {
@ -713,8 +706,8 @@ public class TestDefaultMemStore extends TestCase {
"row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
memstore.delete(delete); memstore.delete(delete);
assertEquals(2, memstore.cellSet.size()); assertEquals(2, memstore.getActive().getCellsCount());
assertEquals(delete, memstore.cellSet.first()); assertEquals(delete, memstore.getActive().first());
} }
public void testRetainsDeleteColumn() throws IOException { public void testRetainsDeleteColumn() throws IOException {
// add a put to memstore // add a put to memstore
@ -725,8 +718,8 @@ public class TestDefaultMemStore extends TestCase {
KeyValue.Type.DeleteColumn, "dont-care"); KeyValue.Type.DeleteColumn, "dont-care");
memstore.delete(delete); memstore.delete(delete);
assertEquals(2, memstore.cellSet.size()); assertEquals(2, memstore.getActive().getCellsCount());
assertEquals(delete, memstore.cellSet.first()); assertEquals(delete, memstore.getActive().first());
} }
public void testRetainsDeleteFamily() throws IOException { public void testRetainsDeleteFamily() throws IOException {
// add a put to memstore // add a put to memstore
@ -737,43 +730,8 @@ public class TestDefaultMemStore extends TestCase {
KeyValue.Type.DeleteFamily, "dont-care"); KeyValue.Type.DeleteFamily, "dont-care");
memstore.delete(delete); memstore.delete(delete);
assertEquals(2, memstore.cellSet.size()); assertEquals(2, memstore.getActive().getCellsCount());
assertEquals(delete, memstore.cellSet.first()); assertEquals(delete, memstore.getActive().first());
}
////////////////////////////////////
//Test for timestamps
////////////////////////////////////
/**
* Test to ensure correctness when using Memstore with multiple timestamps
*/
public void testMultipleTimestamps() throws Exception {
long[] timestamps = new long[] {20,10,5,1};
Scan scan = new Scan();
for (long timestamp: timestamps)
addRows(memstore,timestamp);
byte[] fam = Bytes.toBytes("fam");
HColumnDescriptor hcd = mock(HColumnDescriptor.class);
when(hcd.getName()).thenReturn(fam);
Store store = mock(Store.class);
when(store.getFamily()).thenReturn(hcd);
scan.setColumnFamilyTimeRange(fam, 0, 2);
assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
scan.setColumnFamilyTimeRange(fam, 20, 82);
assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
scan.setColumnFamilyTimeRange(fam, 10, 20);
assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
scan.setColumnFamilyTimeRange(fam, 8, 12);
assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
scan.setColumnFamilyTimeRange(fam, 28, 42);
assertTrue(!memstore.shouldSeek(scan, store, Long.MIN_VALUE));
} }
//////////////////////////////////// ////////////////////////////////////
@ -795,7 +753,7 @@ public class TestDefaultMemStore extends TestCase {
*/ */
public void testUpsertMSLAB() throws Exception { public void testUpsertMSLAB() throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true); conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR); memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
int ROW_SIZE = 2048; int ROW_SIZE = 2048;
@ -838,7 +796,7 @@ public class TestDefaultMemStore extends TestCase {
public void testUpsertMemstoreSize() throws Exception { public void testUpsertMemstoreSize() throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR); memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
long oldSize = memstore.size.get(); long oldSize = memstore.size();
List<Cell> l = new ArrayList<Cell>(); List<Cell> l = new ArrayList<Cell>();
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@ -849,18 +807,18 @@ public class TestDefaultMemStore extends TestCase {
l.add(kv1); l.add(kv2); l.add(kv3); l.add(kv1); l.add(kv2); l.add(kv3);
this.memstore.upsert(l, 2);// readpoint is 2 this.memstore.upsert(l, 2);// readpoint is 2
long newSize = this.memstore.size.get(); long newSize = this.memstore.size();
assert(newSize > oldSize); assert(newSize > oldSize);
//The kv1 should be removed. //The kv1 should be removed.
assert(memstore.cellSet.size() == 2); assert(memstore.getActive().getCellsCount() == 2);
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
kv4.setSequenceId(1); kv4.setSequenceId(1);
l.clear(); l.add(kv4); l.clear(); l.add(kv4);
this.memstore.upsert(l, 3); this.memstore.upsert(l, 3);
assertEquals(newSize, this.memstore.size.get()); assertEquals(newSize, this.memstore.size());
//The kv2 should be removed. //The kv2 should be removed.
assert(memstore.cellSet.size() == 2); assert(memstore.getActive().getCellsCount() == 2);
//this.memstore = null; //this.memstore = null;
} }
@ -1021,10 +979,11 @@ public class TestDefaultMemStore extends TestCase {
private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException { private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException {
// Save off old state. // Save off old state.
int oldHistorySize = hmc.snapshot.size(); int oldHistorySize = hmc.getSnapshot().getCellsCount();
MemStoreSnapshot snapshot = hmc.snapshot(); MemStoreSnapshot snapshot = hmc.snapshot();
// Make some assertions about what just happened. // Make some assertions about what just happened.
assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size()); assertTrue("History size has not increased", oldHistorySize < hmc.getSnapshot().getCellsCount
());
long t = memstore.timeOfOldestEdit(); long t = memstore.timeOfOldestEdit();
assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
hmc.clearSnapshot(snapshot.getId()); hmc.clearSnapshot(snapshot.getId());

View File

@ -18,20 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.security.Key;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -74,6 +60,19 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.security.Key;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
@Category(MediumTests.class) @Category(MediumTests.class)
public class TestHMobStore { public class TestHMobStore {
public static final Log LOG = LogFactory.getLog(TestHMobStore.class); public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
@ -468,7 +467,7 @@ public class TestHMobStore {
this.store.snapshot(); this.store.snapshot();
flushStore(store, id++); flushStore(store, id++);
Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size()); Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size()); Assert.assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
} }
/** /**

View File

@ -18,54 +18,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.ImmutableList;
import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; import com.google.common.collect.Lists;
import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; import com.google.common.collect.Maps;
import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; import com.google.protobuf.ByteString;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -176,10 +132,52 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.collect.ImmutableList; import java.io.IOException;
import com.google.common.collect.Lists; import java.io.InterruptedIOException;
import com.google.common.collect.Maps; import java.security.PrivilegedExceptionAction;
import com.google.protobuf.ByteString; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** /**
* Basic stand-alone testing of HRegion. No clusters! * Basic stand-alone testing of HRegion. No clusters!
@ -302,8 +300,6 @@ public class TestHRegion {
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
/* /*
* This test is for verifying memstore snapshot size is correctly updated in case of rollback * This test is for verifying memstore snapshot size is correctly updated in case of rollback
* See HBASE-10845 * See HBASE-10845
@ -332,7 +328,7 @@ public class TestHRegion {
Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
HRegion region = initHRegion(tableName, null, null, name.getMethodName(), HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
Store store = region.getStore(COLUMN_FAMILY_BYTES); Store store = region.getStore(COLUMN_FAMILY_BYTES);
// Get some random bytes. // Get some random bytes.
@ -1289,7 +1285,8 @@ public class TestHRegion {
private final AtomicInteger count; private final AtomicInteger count;
private Exception e; private Exception e;
GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) { GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
final AtomicInteger c) {
super("getter." + i); super("getter." + i);
this.g = new Get(r); this.g = new Get(r);
this.done = d; this.done = d;
@ -2452,10 +2449,10 @@ public class TestHRegion {
// This is kinda hacky, but better than nothing... // This is kinda hacky, but better than nothing...
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore; DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
Cell firstCell = memstore.cellSet.first(); Cell firstCell = memstore.getActive().first();
assertTrue(firstCell.getTimestamp() <= now); assertTrue(firstCell.getTimestamp() <= now);
now = firstCell.getTimestamp(); now = firstCell.getTimestamp();
for (Cell cell : memstore.cellSet) { for (Cell cell : memstore.getActive().getCellSet()) {
assertTrue(cell.getTimestamp() <= now); assertTrue(cell.getTimestamp() <= now);
now = cell.getTimestamp(); now = cell.getTimestamp();
} }
@ -2782,7 +2779,8 @@ public class TestHRegion {
} catch (NotServingRegionException e) { } catch (NotServingRegionException e) {
// this is the correct exception that is expected // this is the correct exception that is expected
} catch (IOException e) { } catch (IOException e) {
fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: " fail("Got wrong type of exception - should be a NotServingRegionException, " +
"but was an IOException: "
+ e.getMessage()); + e.getMessage());
} }
} finally { } finally {
@ -2980,7 +2978,8 @@ public class TestHRegion {
} }
@Test @Test
public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException { public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
IOException {
byte[] row1 = Bytes.toBytes("row1"); byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1"); byte[] fam1 = Bytes.toBytes("fam1");
byte[][] families = { fam1 }; byte[][] families = { fam1 };
@ -4978,7 +4977,8 @@ public class TestHRegion {
// move the file of the primary region to the archive, simulating a compaction // move the file of the primary region to the archive, simulating a compaction
Collection<StoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); Collection<StoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
.getStoreFiles(families[0]);
Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0);
verifyData(secondaryRegion, 0, 1000, cq, families); verifyData(secondaryRegion, 0, 1000, cq, families);
@ -4992,7 +4992,8 @@ public class TestHRegion {
} }
} }
private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
IOException {
putData(this.region, startRow, numRows, qf, families); putData(this.region, startRow, numRows, qf, families);
} }
@ -5085,7 +5086,6 @@ public class TestHRegion {
/** /**
* Test that we get the expected flush results back * Test that we get the expected flush results back
* @throws IOException
*/ */
@Test @Test
public void testFlushResult() throws IOException { public void testFlushResult() throws IOException {
@ -5138,11 +5138,6 @@ public class TestHRegion {
} }
/** /**
* @param tableName
* @param callingMethod
* @param conf
* @param families
* @throws IOException
* @return A region on which you must call * @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/ */
@ -5152,12 +5147,6 @@ public class TestHRegion {
} }
/** /**
* @param tableName
* @param callingMethod
* @param conf
* @param isReadOnly
* @param families
* @throws IOException
* @return A region on which you must call * @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/ */
@ -5177,14 +5166,6 @@ public class TestHRegion {
} }
/** /**
* @param tableName
* @param startKey
* @param stopKey
* @param callingMethod
* @param conf
* @param isReadOnly
* @param families
* @throws IOException
* @return A region on which you must call * @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/ */
@ -5676,7 +5657,8 @@ public class TestHRegion {
currRow.clear(); currRow.clear();
hasNext = scanner.next(currRow); hasNext = scanner.next(currRow);
assertEquals(2, currRow.size()); assertEquals(2, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow.get(0).getRowLength(), row4, 0, assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
currRow.get(0).getRowLength(), row4, 0,
row4.length)); row4.length));
assertTrue(hasNext); assertTrue(hasNext);
// 2. scan out "row3" (2 kv) // 2. scan out "row3" (2 kv)
@ -6088,7 +6070,7 @@ public class TestHRegion {
public void testOpenRegionWrittenToWALForLogReplay() throws Exception { public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
// similar to the above test but with distributed log replay // similar to the above test but with distributed log replay
final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay", final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay",
100, 42); 100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
HTableDescriptor htd HTableDescriptor htd

View File

@ -18,12 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -36,6 +30,13 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* Test the {@link MemStoreChunkPool} class * Test the {@link MemStoreChunkPool} class
*/ */
@ -47,7 +48,7 @@ public class TestMemStoreChunkPool {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true); conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled;
MemStoreChunkPool.chunkPoolDisabled = false; MemStoreChunkPool.chunkPoolDisabled = false;
@ -116,13 +117,13 @@ public class TestMemStoreChunkPool {
// Creating a snapshot // Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot(); MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.snapshot.size()); assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore // Adding value to "new" memstore
assertEquals(0, memstore.cellSet.size()); assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf4, val));
memstore.add(new KeyValue(row, fam, qf5, val)); memstore.add(new KeyValue(row, fam, qf5, val));
assertEquals(2, memstore.cellSet.size()); assertEquals(2, memstore.getActive().getCellsCount());
memstore.clearSnapshot(snapshot.getId()); memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize(); int chunkCount = chunkPool.getPoolSize();
@ -132,7 +133,7 @@ public class TestMemStoreChunkPool {
@Test @Test
public void testPuttingBackChunksWithOpeningScanner() public void testPuttingBackChunksWithOpeningScanner()
throws UnexpectedStateException { throws IOException {
byte[] row = Bytes.toBytes("testrow"); byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1"); byte[] qf1 = Bytes.toBytes("testqualifier1");
@ -153,13 +154,13 @@ public class TestMemStoreChunkPool {
// Creating a snapshot // Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot(); MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.snapshot.size()); assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore // Adding value to "new" memstore
assertEquals(0, memstore.cellSet.size()); assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf4, val));
memstore.add(new KeyValue(row, fam, qf5, val)); memstore.add(new KeyValue(row, fam, qf5, val));
assertEquals(2, memstore.cellSet.size()); assertEquals(2, memstore.getActive().getCellsCount());
// opening scanner before clear the snapshot // opening scanner before clear the snapshot
List<KeyValueScanner> scanners = memstore.getScanners(0); List<KeyValueScanner> scanners = memstore.getScanners(0);

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.SoftReference; import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -92,8 +93,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists;
/** /**
* Test class for the Store * Test class for the Store
*/ */
@ -555,7 +554,7 @@ public class TestStore {
this.store.snapshot(); this.store.snapshot();
flushStore(store, id++); flushStore(store, id++);
Assert.assertEquals(storeFilessize, this.store.getStorefiles().size()); Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size()); Assert.assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
} }
private void assertCheck() { private void assertCheck() {
@ -600,7 +599,7 @@ public class TestStore {
flushStore(store, id++); flushStore(store, id++);
Assert.assertEquals(1, this.store.getStorefiles().size()); Assert.assertEquals(1, this.store.getStorefiles().size());
// from the one we inserted up there, and a new one // from the one we inserted up there, and a new one
Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size()); Assert.assertEquals(2, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
// how many key/values for this row are there? // how many key/values for this row are there?
Get get = new Get(row); Get get = new Get(row);
@ -669,7 +668,7 @@ public class TestStore {
} }
long computedSize=0; long computedSize=0;
for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) { for (Cell cell : ((AbstractMemStore)this.store.memstore).getActive().getCellSet()) {
long kvsize = DefaultMemStore.heapSizeChange(cell, true); long kvsize = DefaultMemStore.heapSizeChange(cell, true);
//System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
computedSize += kvsize; computedSize += kvsize;
@ -701,7 +700,7 @@ public class TestStore {
// then flush. // then flush.
flushStore(store, id++); flushStore(store, id++);
Assert.assertEquals(1, this.store.getStorefiles().size()); Assert.assertEquals(1, this.store.getStorefiles().size());
Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size()); Assert.assertEquals(1, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
// now increment again: // now increment again:
newValue += 1; newValue += 1;