Patches to alleviate HBASE-790 and HBASE-792

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@681822 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-08-01 20:59:52 +00:00
parent 70e8c417d0
commit b3bdd512f3
5 changed files with 145 additions and 125 deletions

View File

@ -263,8 +263,8 @@ public class HStoreKey implements WritableComparable {
/** {@inheritDoc} */
@Override
public int hashCode() {
int result = this.row.hashCode();
result ^= this.column.hashCode();
int result = Bytes.hashCode(this.row);
result ^= Bytes.hashCode(this.column);
result ^= this.timestamp;
return result;
}

View File

@ -317,7 +317,7 @@ public class HRegion implements HConstants {
new ConcurrentHashMap<Integer, byte []>();
private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns =
new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>();
private volatile boolean flushRequested;
private volatile boolean flushRequested = false;
// Default access because read by tests.
final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
final AtomicLong memcacheSize = new AtomicLong(0);
@ -439,7 +439,6 @@ public class HRegion implements HConstants {
this.conf = conf;
this.regionInfo = regionInfo;
this.flushListener = flushListener;
this.flushRequested = false;
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName());
this.regiondir = new Path(basedir, encodedNameStr);
@ -1193,10 +1192,9 @@ public class HRegion implements HConstants {
storeSet.add(store);
}
}
}
else
} else {
storeSet.addAll(stores.values());
}
// For each column name that is just a column family, open the store
// related to it and fetch everything for that row. HBASE-631
// Also remove each store from storeSet so that these stores
@ -1427,8 +1425,8 @@ public class HRegion implements HConstants {
*/
private synchronized void checkResources() {
boolean blocked = false;
while (this.memcacheSize.get() >= this.blockingMemcacheSize) {
while (this.memcacheSize.get() > this.blockingMemcacheSize) {
requestFlush();
if (!blocked) {
LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
"' on region " + Bytes.toString(getRegionName()) + ": Memcache size " +
@ -1436,7 +1434,6 @@ public class HRegion implements HConstants {
" is >= than blocking " +
StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
}
blocked = true;
try {
wait(threadWakeFrequency);
@ -1610,16 +1607,34 @@ public class HRegion implements HConstants {
getStore(key.getColumn()).add(key, e.getValue()));
}
flush = this.flushListener != null && !this.flushRequested &&
size > this.memcacheFlushSize;
isFlushSize(size);
} finally {
this.updatesLock.readLock().unlock();
}
if (flush) {
// Request a cache flush. Do it outside update lock.
this.flushListener.request(this);
this.flushRequested = true;
requestFlush();
}
}
private void requestFlush() {
if (this.flushListener == null || this.flushRequested) {
return;
}
this.flushListener.request(this);
this.flushRequested = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this);
}
}
/*
* @param size
* @return True if size is over the flush threshold
*/
private boolean isFlushSize(final long size) {
return size > this.memcacheFlushSize;
}
// Do any reconstruction needed from the log
@SuppressWarnings("unused")

View File

@ -608,16 +608,10 @@ public class HStore implements HConstants {
HStoreKey curkey = es.getKey();
byte[] bytes = es.getValue();
if (HStoreKey.matchingFamily(this.family.getName(), curkey.getColumn())) {
if (ttl == HConstants.FOREVER ||
now < curkey.getTimestamp() + ttl) {
if (!isExpired(curkey, ttl, now)) {
entries++;
out.append(curkey, new ImmutableBytesWritable(bytes));
flushed += curkey.getSize() + (bytes == null ? 0 : bytes.length);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("internalFlushCache: " + curkey +
": expired, skipped");
}
}
}
}
@ -930,12 +924,8 @@ public class HStore implements HConstants {
if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
// Only write out objects which have a non-zero length key and
// value
if (ttl == HConstants.FOREVER || now < sk.getTimestamp() + ttl) {
if (!isExpired(sk, ttl, now)) {
compactedOut.append(sk, vals[smallestKey]);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("compactHStoreFiles: " + sk + ": expired, deleted");
}
}
}
}
@ -1197,17 +1187,12 @@ public class HStore implements HConstants {
// there aren't any pending deletes.
if (!(deletes.containsKey(readcol) &&
deletes.get(readcol).longValue() >= readkey.getTimestamp())) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (!isExpired(readkey, ttl, now)) {
results.put(readcol,
new Cell(readval.get(), readkey.getTimestamp()));
// need to reinstantiate the readval so we can reuse it,
// otherwise next iteration will destroy our result
readval = new ImmutableBytesWritable();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("getFullFromMapFile: " + readkey + ": expired, skipped");
}
}
}
}
@ -1259,7 +1244,7 @@ public class HStore implements HConstants {
// the old non-delete cell value in a later store file. If we don't keep
// around the fact that the cell was deleted in a newer record, we end up
// returning the old value if user is asking for more than one version.
// This List of deletes should not large since we are only keeping rows
// This List of deletes should not be large since we are only keeping rows
// and columns that match those set on the scanner and which have delete
// values. If memory usage becomes an issue, could redo as bloom filter.
Map<byte [], List<Long>> deletes =
@ -1283,13 +1268,8 @@ public class HStore implements HConstants {
continue;
}
if (!isDeleted(readkey, readval.get(), true, deletes)) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (!isExpired(readkey, ttl, now)) {
results.add(new Cell(readval.get(), readkey.getTimestamp()));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("get: " + readkey + ": expired, skipped");
}
}
// Perhaps only one version is wanted. I could let this
// test happen later in the for loop test but it would cost
@ -1304,13 +1284,8 @@ public class HStore implements HConstants {
!hasEnoughVersions(numVersions, results);
readval = new ImmutableBytesWritable()) {
if (!isDeleted(readkey, readval.get(), true, deletes)) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (!isExpired(readkey, ttl, now)) {
results.add(new Cell(readval.get(), readkey.getTimestamp()));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("get: " + readkey + ": expired, skipped");
}
}
}
}
@ -1393,14 +1368,8 @@ public class HStore implements HConstants {
// in the memcache
if (!isDeleted(readkey, readval.get(), false, null) &&
!keys.contains(readkey)) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (!isExpired(readkey, ttl, now)) {
keys.add(new HStoreKey(readkey));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("getKeys: " + readkey +
": expired, skipped");
}
}
// if we've collected enough versions, then exit the loop.
@ -1439,24 +1408,34 @@ public class HStore implements HConstants {
byte [] getRowKeyAtOrBefore(final byte [] row)
throws IOException{
// Map of HStoreKeys that are candidates for holding the row key that
// most closely matches what we're looking for. We'll have to update it
// deletes found all over the place as we go along before finally reading
// the best key out of it at the end.
// most closely matches what we're looking for. We'll have to update it as
// deletes are found all over the place as we go along before finally
// reading the best key out of it at the end.
SortedMap<HStoreKey, Long> candidateKeys = new TreeMap<HStoreKey, Long>();
// Obtain read lock
// Keep a list of deleted cell keys. We need this because as we go through
// the store files, the cell with the delete marker may be in one file and
// the old non-delete cell value in a later store file. If we don't keep
// around the fact that the cell was deleted in a newer record, we end up
// returning the old value if user is asking for more than one version.
// This List of deletes should not be large since we are only keeping rows
// and columns that match those set on the scanner and which have delete
// values. If memory usage becomes an issue, could redo as bloom filter.
Set<HStoreKey> deletes = new HashSet<HStoreKey>();
this.lock.readLock().lock();
try {
// Process each store file. Run through from oldest to newest so deletes
// have chance to overshadow deleted cells
// First go to the memcache. Pick up deletes and candidates.
this.memcache.getRowKeyAtOrBefore(row, candidateKeys, deletes);
// Process each store file. Run through from newest to oldest.
// This code below is very close to the body of the getKeys method.
MapFile.Reader[] maparray = getReaders();
for (int i = 0; i < maparray.length; i++) {
for(int i = maparray.length - 1; i >= 0; i--) {
// Update the candidate keys from the current map file
rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys);
rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys, deletes);
}
// Finally, check the memcache
this.memcache.getRowKeyAtOrBefore(row, candidateKeys);
// Return the best key from candidateKeys
return candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
} finally {
@ -1472,8 +1451,9 @@ public class HStore implements HConstants {
* @param candidateKeys
* @throws IOException
*/
private void rowAtOrBeforeFromMapFile(MapFile.Reader map, final byte [] row,
SortedMap<HStoreKey, Long> candidateKeys)
private void rowAtOrBeforeFromMapFile(final MapFile.Reader map,
final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes)
throws IOException {
HStoreKey startKey = new HStoreKey();
ImmutableBytesWritable startValue = new ImmutableBytesWritable();
@ -1491,9 +1471,9 @@ public class HStore implements HConstants {
long now = System.currentTimeMillis();
// if there aren't any candidate keys yet, we'll do some things different
if (candidateKeys.isEmpty()) {
rowAtOrBeforeCandidate(startKey, map, row, candidateKeys, now);
rowAtOrBeforeCandidate(startKey, map, row, candidateKeys, deletes, now);
} else {
rowAtOrBeforeWithCandidates(startKey, map, row, candidateKeys,
rowAtOrBeforeWithCandidates(startKey, map, row, candidateKeys, deletes,
now);
}
}
@ -1510,7 +1490,8 @@ public class HStore implements HConstants {
*/
private void rowAtOrBeforeCandidate(final HStoreKey startKey,
final MapFile.Reader map, final byte[] row,
final SortedMap<HStoreKey, Long> candidateKeys, final long now)
final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes, final long now)
throws IOException {
// if the row we're looking for is past the end of this mapfile, set the
// search key to be the last key. If its a deleted key, then we'll back
@ -1525,9 +1506,31 @@ public class HStore implements HConstants {
searchKey = startKey;
}
}
rowAtOrBeforeCandidate(map, searchKey, candidateKeys, now);
rowAtOrBeforeCandidate(map, searchKey, candidateKeys, deletes, now);
}
/*
* @param ttlSetting
* @param hsk
* @param now
* @param deletes
* @return True if key has not expired and is not in passed set of deletes.
*/
static boolean notExpiredAndNotInDeletes(final long ttl,
final HStoreKey hsk, final long now, final Set<HStoreKey> deletes) {
return !isExpired(hsk, ttl, now) && !deletes.contains(hsk);
}
private static boolean isExpired(final HStoreKey hsk, final long ttl,
final long now) {
boolean result = ttl != HConstants.FOREVER && now > hsk.getTimestamp() + ttl;
if (LOG.isDebugEnabled()) {
LOG.debug("rowAtOrBeforeCandidate 1:" + hsk +
": expired, skipped");
}
return result;
}
/* Find a candidate for row that is at or before passed key, sk, in mapfile.
* @param map
* @param sk Key to go search the mapfile with.
@ -1538,7 +1541,7 @@ public class HStore implements HConstants {
*/
private void rowAtOrBeforeCandidate(final MapFile.Reader map,
final HStoreKey sk, final SortedMap<HStoreKey, Long> candidateKeys,
final long now)
final Set<HStoreKey> deletes, final long now)
throws IOException {
HStoreKey searchKey = sk;
HStoreKey readkey = new HStoreKey();
@ -1557,19 +1560,16 @@ public class HStore implements HConstants {
// as a candidate key
if (Bytes.equals(readkey.getRow(), searchKey.getRow())) {
if (!HLogEdit.isDeleted(readval.get())) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
candidateKeys.put(stripTimestamp(readkey),
new Long(readkey.getTimestamp()));
foundCandidate = true;
// NOTE! Continue.
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("rowAtOrBeforeCandidate 1:" + readkey +
": expired, skipped");
}
}
// Deleted value.
deletes.add(readkey);
if (deletedOrExpiredRow == null) {
deletedOrExpiredRow = new HStoreKey(readkey);
}
@ -1582,18 +1582,14 @@ public class HStore implements HConstants {
// we're seeking yet, so this row is a candidate for closest
// (assuming that it isn't a delete).
if (!HLogEdit.isDeleted(readval.get())) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
candidateKeys.put(stripTimestamp(readkey),
new Long(readkey.getTimestamp()));
foundCandidate = true;
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("rowAtOrBeforeCandidate 2:" + readkey +
": expired, skipped");
}
}
deletes.add(readkey);
if (deletedOrExpiredRow == null) {
deletedOrExpiredRow = new HStoreKey(readkey);
}
@ -1619,7 +1615,8 @@ public class HStore implements HConstants {
private void rowAtOrBeforeWithCandidates(final HStoreKey startKey,
final MapFile.Reader map, final byte[] row,
final SortedMap<HStoreKey, Long> candidateKeys, final long now)
final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes, final long now)
throws IOException {
HStoreKey readkey = new HStoreKey();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
@ -1650,15 +1647,9 @@ public class HStore implements HConstants {
if (Bytes.equals(readkey.getRow(), row)) {
strippedKey = stripTimestamp(readkey);
if (!HLogEdit.isDeleted(readval.get())) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
candidateKeys.put(strippedKey,
new Long(readkey.getTimestamp()));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("rowAtOrBeforeWithCandidates 1: " + readkey +
": expired, skipped");
}
}
} else {
// If the candidate keys contain any that might match by timestamp,
@ -1682,14 +1673,8 @@ public class HStore implements HConstants {
// we're seeking yet, so this row is a candidate for closest
// (assuming that it isn't a delete).
if (!HLogEdit.isDeleted(readval.get())) {
if (ttl == HConstants.FOREVER ||
now < readkey.getTimestamp() + ttl) {
if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
candidateKeys.put(strippedKey, Long.valueOf(readkey.getTimestamp()));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("rowAtOrBeforeWithCandidates 2: " + readkey +
": expired, skipped");
}
}
} else {
// If the candidate keys contain any that might match by timestamp,
@ -1902,4 +1887,4 @@ public class HStore implements HConstants {
return key;
}
}
}
}

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -327,24 +328,38 @@ class Memcache {
* @param row Row to look for.
* @param candidateKeys Map of candidate keys (Accumulation over lots of
* lookup over stores and memcaches)
* @param deletes Deletes collected so far.
*/
void getRowKeyAtOrBefore(final byte [] row,
SortedMap<HStoreKey, Long> candidateKeys) {
void getRowKeyAtOrBefore(final byte [] row,
final SortedMap<HStoreKey, Long> candidateKeys) {
getRowKeyAtOrBefore(row, candidateKeys, new HashSet<HStoreKey>());
}
/**
* @param row Row to look for.
* @param candidateKeys Map of candidate keys (Accumulation over lots of
* lookup over stores and memcaches)
* @param deletes Deletes collected so far.
*/
void getRowKeyAtOrBefore(final byte [] row,
final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes) {
this.lock.readLock().lock();
try {
synchronized (memcache) {
internalGetRowKeyAtOrBefore(memcache, row, candidateKeys);
getRowKeyAtOrBefore(memcache, row, candidateKeys, deletes);
}
synchronized (snapshot) {
internalGetRowKeyAtOrBefore(snapshot, row, candidateKeys);
getRowKeyAtOrBefore(snapshot, row, candidateKeys, deletes);
}
} finally {
this.lock.readLock().unlock();
}
}
private void internalGetRowKeyAtOrBefore(SortedMap<HStoreKey, byte []> map,
byte [] row, SortedMap<HStoreKey, Long> candidateKeys) {
private void getRowKeyAtOrBefore(final SortedMap<HStoreKey, byte []> map,
final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes) {
// We want the earliest possible to start searching from. Start before
// the candidate key in case it turns out a delete came in later.
HStoreKey search_key = candidateKeys.isEmpty()? new HStoreKey(row):
@ -371,13 +386,12 @@ class Memcache {
found_key = key_iterator.next();
if (Bytes.compareTo(found_key.getRow(), row) <= 0) {
if (HLogEdit.isDeleted(tailMap.get(found_key))) {
handleDeleted(found_key, candidateKeys);
handleDeleted(found_key, candidateKeys, deletes);
if (deletedOrExpiredRow == null) {
deletedOrExpiredRow = found_key;
}
} else {
if (ttl == HConstants.FOREVER ||
now < found_key.getTimestamp() + ttl) {
if (HStore.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) {
HStoreKey strippedKey = stripTimestamp(found_key);
candidateKeys.put(strippedKey,
new Long(found_key.getTimestamp()));
@ -395,12 +409,13 @@ class Memcache {
}
}
if (candidateKeys.isEmpty() && deletedOrExpiredRow != null) {
getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims, now);
getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims,
deletes, now);
}
} else {
// The tail didn't contain any keys that matched our criteria, or was
// empty. Examine all the keys that proceed our splitting point.
getRowKeyBefore(map, search_key, candidateKeys, victims, now);
getRowKeyBefore(map, search_key, candidateKeys, victims, deletes, now);
}
// Remove expired victims from the map.
for (HStoreKey victim: victims) {
@ -419,7 +434,8 @@ class Memcache {
*/
private void getRowKeyBefore(SortedMap<HStoreKey, byte []> map,
HStoreKey search_key, SortedMap<HStoreKey, Long> candidateKeys,
List<HStoreKey> victims, final long now) {
final List<HStoreKey> expires, final Set<HStoreKey> deletes,
final long now) {
SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
// If we tried to create a headMap and got an empty map, then there are
// no keys at or before the search key, so we're done.
@ -429,35 +445,36 @@ class Memcache {
// If there aren't any candidate keys at this point, we need to search
// backwards until we find at least one candidate or run out of headMap.
HStoreKey found_key = null;
if (candidateKeys.isEmpty()) {
Set<HStoreKey> keys = headMap.keySet();
HStoreKey [] cells = keys.toArray(new HStoreKey[keys.size()]);
byte [] lastRowFound = null;
for (int i = cells.length - 1; i >= 0; i--) {
HStoreKey thisKey = cells[i];
HStoreKey found_key = cells[i];
// if the last row we found a candidate key for is different than
// the row of the current candidate, we can stop looking -- if its
// not a delete record.
boolean deleted = HLogEdit.isDeleted(headMap.get(thisKey));
boolean deleted = HLogEdit.isDeleted(headMap.get(found_key));
if (lastRowFound != null &&
!Bytes.equals(lastRowFound, thisKey.getRow()) && !deleted) {
!Bytes.equals(lastRowFound, found_key.getRow()) && !deleted) {
break;
}
// If this isn't a delete, record it as a candidate key. Also
// take note of the row of this candidate so that we'll know when
// we cross the row boundary into the previous row.
if (!deleted) {
if (ttl == HConstants.FOREVER || now < thisKey.getTimestamp() + ttl) {
lastRowFound = thisKey.getRow();
candidateKeys.put(stripTimestamp(thisKey),
new Long(thisKey.getTimestamp()));
if (HStore.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) {
lastRowFound = found_key.getRow();
candidateKeys.put(stripTimestamp(found_key),
new Long(found_key.getTimestamp()));
} else {
victims.add(found_key);
expires.add(found_key);
if (LOG.isDebugEnabled()) {
LOG.debug("getRowKeyBefore: " + found_key + ": expired, skipped");
}
}
} else {
deletes.add(found_key);
}
}
} else {
@ -469,16 +486,17 @@ class Memcache {
headMap.tailMap(new HStoreKey(headMap.lastKey().getRow()));
Iterator<HStoreKey> key_iterator = thisRowTailMap.keySet().iterator();
do {
found_key = key_iterator.next();
HStoreKey found_key = key_iterator.next();
if (HLogEdit.isDeleted(thisRowTailMap.get(found_key))) {
handleDeleted(found_key, candidateKeys);
handleDeleted(found_key, candidateKeys, deletes);
} else {
if (ttl == HConstants.FOREVER ||
now < found_key.getTimestamp() + ttl) {
now < found_key.getTimestamp() + ttl ||
!deletes.contains(found_key)) {
candidateKeys.put(stripTimestamp(found_key),
Long.valueOf(found_key.getTimestamp()));
} else {
victims.add(found_key);
expires.add(found_key);
if (LOG.isDebugEnabled()) {
LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
": expired, skipped");
@ -490,7 +508,9 @@ class Memcache {
}
private void handleDeleted(final HStoreKey k,
final SortedMap<HStoreKey, Long> candidateKeys) {
final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes) {
deletes.add(k);
HStoreKey strippedKey = stripTimestamp(k);
if (candidateKeys.containsKey(strippedKey)) {
long bestCandidateTs =

View File

@ -91,7 +91,7 @@ public class TestGet2 extends HBaseTestCase implements HConstants {
batchUpdate.delete(COLUMNS[0]);
region.batchUpdate(batchUpdate);
results = region.getClosestRowBefore(Bytes.toBytes(T10));
results = region.getClosestRowBefore(Bytes.toBytes(T20));
assertEquals(T10, new String(results.get(COLUMNS[0]).getValue()));
batchUpdate = new BatchUpdate(T30);