HBASE-9963 Remove the ReentrantReadWriteLock in the MemStore

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1541880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-11-14 11:16:20 +00:00
parent e15f36cdde
commit db4f10c208
5 changed files with 127 additions and 166 deletions

View File

@ -37,7 +37,7 @@ import java.util.Map;
/**
* Used to communicate with a single HBase table.
* Obtain an instance from an {@ink HConnection}.
* Obtain an instance from an {@link HConnection}.
*
* @since 0.21.0
*/

View File

@ -332,6 +332,19 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
this.length = length;
}
/**
* Creates a KeyValue from the specified byte array, starting at offset, and
* for length <code>length</code>.
*
* @param bytes byte array
* @param offset offset to start of the KeyValue
* @param length length of the KeyValue
* @param ts
*/
public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
}
/** Constructors that build a new backing byte array from fields */
/**

View File

@ -683,7 +683,12 @@ public class HStore implements Store {
* so it has some work to do.
*/
void snapshot() {
this.memstore.snapshot();
this.lock.writeLock().lock();
try {
this.memstore.snapshot();
} finally {
this.lock.writeLock().unlock();
}
}
/**

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -53,6 +52,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* to snapshot and is cleared. We continue to serve edits out of new memstore
* and backing snapshot until flusher reports in that the flush succeeded. At
* this point we let the snapshot go.
* <p>
* The MemStore functions should not be called in parallel. Callers should hold
* write and read locks. This is done in {@link HStore}.
* </p>
*
* TODO: Adjust size of the memstore when we remove items because they have
* been deleted.
* TODO: With new KVSLS, need to make sure we update HeapSize with difference
@ -78,8 +82,6 @@ public class MemStore implements HeapSize {
// Snapshot of memstore. Made for flusher.
volatile KeyValueSkipListSet snapshot;
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final KeyValue.KVComparator comparator;
// Used to track own heapSize
@ -139,31 +141,26 @@ public class MemStore implements HeapSize {
* To get the snapshot made by this method, use {@link #getSnapshot()}
*/
void snapshot() {
this.lock.writeLock().lock();
try {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = this.timeRangeTracker;
this.timeRangeTracker = new TimeRangeTracker();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
this.snapshotAllocator = this.allocator;
// Reset allocator so we get a fresh buffer for the new memstore
if (allocator != null) {
this.allocator = new MemStoreLAB(conf, chunkPool);
}
timeOfOldestEdit = Long.MAX_VALUE;
} else {
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = this.timeRangeTracker;
this.timeRangeTracker = new TimeRangeTracker();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
this.snapshotAllocator = this.allocator;
// Reset allocator so we get a fresh buffer for the new memstore
if (allocator != null) {
this.allocator = new MemStoreLAB(conf, chunkPool);
}
timeOfOldestEdit = Long.MAX_VALUE;
}
} finally {
this.lock.writeLock().unlock();
}
}
@ -188,24 +185,19 @@ public class MemStore implements HeapSize {
void clearSnapshot(final SortedSet<KeyValue> ss)
throws UnexpectedException {
MemStoreLAB tmpAllocator = null;
this.lock.writeLock().lock();
try {
if (this.snapshot != ss) {
throw new UnexpectedException("Current snapshot is " +
if (this.snapshot != ss) {
throw new UnexpectedException("Current snapshot is " +
this.snapshot + ", was passed " + ss);
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
if (this.snapshotAllocator != null) {
tmpAllocator = this.snapshotAllocator;
this.snapshotAllocator = null;
}
} finally {
this.lock.writeLock().unlock();
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
if (this.snapshotAllocator != null) {
tmpAllocator = this.snapshotAllocator;
this.snapshotAllocator = null;
}
if (tmpAllocator != null) {
tmpAllocator.close();
@ -218,13 +210,8 @@ public class MemStore implements HeapSize {
* @return approximate size of the passed key and value.
*/
long add(final KeyValue kv) {
this.lock.readLock().lock();
try {
KeyValue toAdd = maybeCloneWithAllocator(kv);
return internalAdd(toAdd);
} finally {
this.lock.readLock().unlock();
}
KeyValue toAdd = maybeCloneWithAllocator(kv);
return internalAdd(toAdd);
}
long timeOfOldestEdit() {
@ -274,7 +261,7 @@ public class MemStore implements HeapSize {
// not to do anything with it.
return kv;
}
assert alloc != null && alloc.getData() != null;
assert alloc.getData() != null;
System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
newKv.setMvccVersion(kv.getMvccVersion());
@ -290,26 +277,21 @@ public class MemStore implements HeapSize {
* @param kv
*/
void rollback(final KeyValue kv) {
this.lock.readLock().lock();
try {
// If the key is in the snapshot, delete it. We should not update
// this.size, because that tracks the size of only the memstore and
// not the snapshot. The flush of this snapshot to disk has not
// yet started because Store.flush() waits for all rwcc transactions to
// commit before starting the flush to disk.
KeyValue found = this.snapshot.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
this.snapshot.remove(kv);
}
// If the key is in the memstore, delete it. Update this.size.
found = this.kvset.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
removeFromKVSet(kv);
long s = heapSizeChange(kv, true);
this.size.addAndGet(-s);
}
} finally {
this.lock.readLock().unlock();
// If the key is in the snapshot, delete it. We should not update
// this.size, because that tracks the size of only the memstore and
// not the snapshot. The flush of this snapshot to disk has not
// yet started because Store.flush() waits for all rwcc transactions to
// commit before starting the flush to disk.
KeyValue found = this.snapshot.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
this.snapshot.remove(kv);
}
// If the key is in the memstore, delete it. Update this.size.
found = this.kvset.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
removeFromKVSet(kv);
long s = heapSizeChange(kv, true);
this.size.addAndGet(-s);
}
}
@ -320,14 +302,9 @@ public class MemStore implements HeapSize {
*/
long delete(final KeyValue delete) {
long s = 0;
this.lock.readLock().lock();
try {
KeyValue toAdd = maybeCloneWithAllocator(delete);
s += heapSizeChange(toAdd, addToKVSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
} finally {
this.lock.readLock().unlock();
}
KeyValue toAdd = maybeCloneWithAllocator(delete);
s += heapSizeChange(toAdd, addToKVSet(toAdd));
timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s;
}
@ -338,12 +315,7 @@ public class MemStore implements HeapSize {
* @return Next row or null if none found.
*/
KeyValue getNextRow(final KeyValue kv) {
this.lock.readLock().lock();
try {
return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
} finally {
this.lock.readLock().unlock();
}
return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
}
/*
@ -387,13 +359,8 @@ public class MemStore implements HeapSize {
* @param state column/delete tracking state
*/
void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
this.lock.readLock().lock();
try {
getRowKeyAtOrBefore(kvset, state);
getRowKeyAtOrBefore(snapshot, state);
} finally {
this.lock.readLock().unlock();
}
getRowKeyAtOrBefore(kvset, state);
getRowKeyAtOrBefore(snapshot, state);
}
/*
@ -459,7 +426,8 @@ public class MemStore implements HeapSize {
// Stop looking if we've exited the better candidate range.
if (!state.isBetterCandidate(p.kv)) break;
// Make into firstOnRow
firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
HConstants.LATEST_TIMESTAMP);
// If we find something, break;
if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
}
@ -487,54 +455,46 @@ public class MemStore implements HeapSize {
byte[] qualifier,
long newValue,
long now) {
this.lock.readLock().lock();
try {
KeyValue firstKv = KeyValue.createFirstOnRow(
row, family, qualifier);
// Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
if (!snSs.isEmpty()) {
KeyValue snKv = snSs.first();
// is there a matching KV in the snapshot?
if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
if (snKv.getTimestamp() == now) {
// poop,
now += 1;
}
KeyValue firstKv = KeyValue.createFirstOnRow(
row, family, qualifier);
// Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
if (!snSs.isEmpty()) {
KeyValue snKv = snSs.first();
// is there a matching KV in the snapshot?
if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
if (snKv.getTimestamp() == now) {
// poop,
now += 1;
}
}
// logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
// But the timestamp should also be max(now, mostRecentTsInMemstore)
// so we cant add the new KV w/o knowing what's there already, but we also
// want to take this chance to delete some kvs. So two loops (sad)
SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
Iterator<KeyValue> it = ss.iterator();
while ( it.hasNext() ) {
KeyValue kv = it.next();
// if this isnt the row we are interested in, then bail:
if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
break; // rows dont match, bail.
}
// if the qualifier matches and it's a put, just RM it out of the kvset.
if (kv.getType() == KeyValue.Type.Put.getCode() &&
kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
now = kv.getTimestamp();
}
}
// create or update (upsert) a new KeyValue with
// 'now' and a 0 memstoreTS == immediately visible
List<Cell> cells = new ArrayList<Cell>(1);
cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
return upsert(cells, 1L);
} finally {
this.lock.readLock().unlock();
}
// logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
// But the timestamp should also be max(now, mostRecentTsInMemstore)
// so we cant add the new KV w/o knowing what's there already, but we also
// want to take this chance to delete some kvs. So two loops (sad)
SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
for (KeyValue kv : ss) {
// if this isnt the row we are interested in, then bail:
if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
break; // rows dont match, bail.
}
// if the qualifier matches and it's a put, just RM it out of the kvset.
if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
now = kv.getTimestamp();
}
}
// create or update (upsert) a new KeyValue with
// 'now' and a 0 memstoreTS == immediately visible
List<Cell> cells = new ArrayList<Cell>(1);
cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
return upsert(cells, 1L);
}
/**
@ -556,16 +516,11 @@ public class MemStore implements HeapSize {
* @return change in memstore size
*/
public long upsert(Iterable<Cell> cells, long readpoint) {
this.lock.readLock().lock();
try {
long size = 0;
for (Cell cell : cells) {
size += upsert(cell, readpoint);
}
return size;
} finally {
this.lock.readLock().unlock();
long size = 0;
for (Cell cell : cells) {
size += upsert(cell, readpoint);
}
return size;
}
/**
@ -612,7 +567,8 @@ public class MemStore implements HeapSize {
// check that this is the row and column we are interested in, otherwise bail
if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
// only remove Puts that concurrent scanners cannot possibly see
if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMvccVersion() <= readpoint) {
if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
cur.getMvccVersion() <= readpoint) {
if (versionsVisible > 1) {
// if we get here we have seen at least one version visible to the oldest scanner,
// which means we can prove that no scanner will see this version
@ -675,13 +631,8 @@ public class MemStore implements HeapSize {
* @return scanner on memstore and snapshot in this order.
*/
List<KeyValueScanner> getScanners(long readPt) {
this.lock.readLock().lock();
try {
return Collections.<KeyValueScanner>singletonList(
new MemStoreScanner(readPt));
} finally {
this.lock.readLock().unlock();
}
return Collections.<KeyValueScanner>singletonList(
new MemStoreScanner(readPt));
}
/**
@ -959,16 +910,12 @@ public class MemStore implements HeapSize {
}
public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
(2 * ClassSize.TIMERANGE_TRACKER) +
ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
(2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
/** Used for readability when we don't store memstore timestamp in HFile */
public static final boolean NO_PERSISTENT_TS = false;
/*
* Calculate how the MemStore size has changed. Includes overhead of the
* backing Map.

View File

@ -40,7 +40,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
@ -49,7 +48,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
@ -305,14 +303,12 @@ public class TestHeapSize {
// MemStore Deep Overhead
actual = MemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false));
expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
if(expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(KeyValueSkipListSet.class, true);
ClassSize.estimateBase(KeyValueSkipListSet.class, true);