diff --git a/src/java/org/apache/hadoop/hbase/HStoreKey.java b/src/java/org/apache/hadoop/hbase/HStoreKey.java index 33b0da39ee1..98f31109911 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreKey.java +++ b/src/java/org/apache/hadoop/hbase/HStoreKey.java @@ -263,8 +263,8 @@ public class HStoreKey implements WritableComparable { /** {@inheritDoc} */ @Override public int hashCode() { - int result = this.row.hashCode(); - result ^= this.column.hashCode(); + int result = Bytes.hashCode(this.row); + result ^= Bytes.hashCode(this.column); result ^= this.timestamp; return result; } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 74cdc031ecb..3d0b2b25769 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -317,7 +317,7 @@ public class HRegion implements HConstants { new ConcurrentHashMap(); private final Map> targetColumns = new ConcurrentHashMap>(); - private volatile boolean flushRequested; + private volatile boolean flushRequested = false; // Default access because read by tests. final Map stores = new ConcurrentHashMap(); final AtomicLong memcacheSize = new AtomicLong(0); @@ -439,7 +439,6 @@ public class HRegion implements HConstants { this.conf = conf; this.regionInfo = regionInfo; this.flushListener = flushListener; - this.flushRequested = false; this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName()); this.regiondir = new Path(basedir, encodedNameStr); @@ -1193,10 +1192,9 @@ public class HRegion implements HConstants { storeSet.add(store); } } - } - else + } else { storeSet.addAll(stores.values()); - + } // For each column name that is just a column family, open the store // related to it and fetch everything for that row. HBASE-631 // Also remove each store from storeSet so that these stores @@ -1427,8 +1425,8 @@ public class HRegion implements HConstants { */ private synchronized void checkResources() { boolean blocked = false; - - while (this.memcacheSize.get() >= this.blockingMemcacheSize) { + while (this.memcacheSize.get() > this.blockingMemcacheSize) { + requestFlush(); if (!blocked) { LOG.info("Blocking updates for '" + Thread.currentThread().getName() + "' on region " + Bytes.toString(getRegionName()) + ": Memcache size " + @@ -1436,7 +1434,6 @@ public class HRegion implements HConstants { " is >= than blocking " + StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size"); } - blocked = true; try { wait(threadWakeFrequency); @@ -1610,16 +1607,34 @@ public class HRegion implements HConstants { getStore(key.getColumn()).add(key, e.getValue())); } flush = this.flushListener != null && !this.flushRequested && - size > this.memcacheFlushSize; + isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); } if (flush) { // Request a cache flush. Do it outside update lock. - this.flushListener.request(this); - this.flushRequested = true; + requestFlush(); } } + + private void requestFlush() { + if (this.flushListener == null || this.flushRequested) { + return; + } + this.flushListener.request(this); + this.flushRequested = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Flush requested on " + this); + } + } + + /* + * @param size + * @return True if size is over the flush threshold + */ + private boolean isFlushSize(final long size) { + return size > this.memcacheFlushSize; + } // Do any reconstruction needed from the log @SuppressWarnings("unused") diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java index 0b61b36361b..242863f2677 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -608,16 +608,10 @@ public class HStore implements HConstants { HStoreKey curkey = es.getKey(); byte[] bytes = es.getValue(); if (HStoreKey.matchingFamily(this.family.getName(), curkey.getColumn())) { - if (ttl == HConstants.FOREVER || - now < curkey.getTimestamp() + ttl) { + if (!isExpired(curkey, ttl, now)) { entries++; out.append(curkey, new ImmutableBytesWritable(bytes)); flushed += curkey.getSize() + (bytes == null ? 0 : bytes.length); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("internalFlushCache: " + curkey + - ": expired, skipped"); - } } } } @@ -930,12 +924,8 @@ public class HStore implements HConstants { if (sk.getRow().length != 0 && sk.getColumn().length != 0) { // Only write out objects which have a non-zero length key and // value - if (ttl == HConstants.FOREVER || now < sk.getTimestamp() + ttl) { + if (!isExpired(sk, ttl, now)) { compactedOut.append(sk, vals[smallestKey]); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("compactHStoreFiles: " + sk + ": expired, deleted"); - } } } } @@ -1197,17 +1187,12 @@ public class HStore implements HConstants { // there aren't any pending deletes. if (!(deletes.containsKey(readcol) && deletes.get(readcol).longValue() >= readkey.getTimestamp())) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (!isExpired(readkey, ttl, now)) { results.put(readcol, new Cell(readval.get(), readkey.getTimestamp())); // need to reinstantiate the readval so we can reuse it, // otherwise next iteration will destroy our result readval = new ImmutableBytesWritable(); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("getFullFromMapFile: " + readkey + ": expired, skipped"); - } } } } @@ -1259,7 +1244,7 @@ public class HStore implements HConstants { // the old non-delete cell value in a later store file. If we don't keep // around the fact that the cell was deleted in a newer record, we end up // returning the old value if user is asking for more than one version. - // This List of deletes should not large since we are only keeping rows + // This List of deletes should not be large since we are only keeping rows // and columns that match those set on the scanner and which have delete // values. If memory usage becomes an issue, could redo as bloom filter. Map> deletes = @@ -1283,13 +1268,8 @@ public class HStore implements HConstants { continue; } if (!isDeleted(readkey, readval.get(), true, deletes)) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (!isExpired(readkey, ttl, now)) { results.add(new Cell(readval.get(), readkey.getTimestamp())); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("get: " + readkey + ": expired, skipped"); - } } // Perhaps only one version is wanted. I could let this // test happen later in the for loop test but it would cost @@ -1304,13 +1284,8 @@ public class HStore implements HConstants { !hasEnoughVersions(numVersions, results); readval = new ImmutableBytesWritable()) { if (!isDeleted(readkey, readval.get(), true, deletes)) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (!isExpired(readkey, ttl, now)) { results.add(new Cell(readval.get(), readkey.getTimestamp())); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("get: " + readkey + ": expired, skipped"); - } } } } @@ -1393,14 +1368,8 @@ public class HStore implements HConstants { // in the memcache if (!isDeleted(readkey, readval.get(), false, null) && !keys.contains(readkey)) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (!isExpired(readkey, ttl, now)) { keys.add(new HStoreKey(readkey)); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("getKeys: " + readkey + - ": expired, skipped"); - } } // if we've collected enough versions, then exit the loop. @@ -1439,24 +1408,34 @@ public class HStore implements HConstants { byte [] getRowKeyAtOrBefore(final byte [] row) throws IOException{ // Map of HStoreKeys that are candidates for holding the row key that - // most closely matches what we're looking for. We'll have to update it - // deletes found all over the place as we go along before finally reading - // the best key out of it at the end. + // most closely matches what we're looking for. We'll have to update it as + // deletes are found all over the place as we go along before finally + // reading the best key out of it at the end. SortedMap candidateKeys = new TreeMap(); - // Obtain read lock + + // Keep a list of deleted cell keys. We need this because as we go through + // the store files, the cell with the delete marker may be in one file and + // the old non-delete cell value in a later store file. If we don't keep + // around the fact that the cell was deleted in a newer record, we end up + // returning the old value if user is asking for more than one version. + // This List of deletes should not be large since we are only keeping rows + // and columns that match those set on the scanner and which have delete + // values. If memory usage becomes an issue, could redo as bloom filter. + Set deletes = new HashSet(); + + this.lock.readLock().lock(); try { - // Process each store file. Run through from oldest to newest so deletes - // have chance to overshadow deleted cells + // First go to the memcache. Pick up deletes and candidates. + this.memcache.getRowKeyAtOrBefore(row, candidateKeys, deletes); + + // Process each store file. Run through from newest to oldest. + // This code below is very close to the body of the getKeys method. MapFile.Reader[] maparray = getReaders(); - for (int i = 0; i < maparray.length; i++) { + for(int i = maparray.length - 1; i >= 0; i--) { // Update the candidate keys from the current map file - rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys); + rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys, deletes); } - - // Finally, check the memcache - this.memcache.getRowKeyAtOrBefore(row, candidateKeys); - // Return the best key from candidateKeys return candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow(); } finally { @@ -1472,8 +1451,9 @@ public class HStore implements HConstants { * @param candidateKeys * @throws IOException */ - private void rowAtOrBeforeFromMapFile(MapFile.Reader map, final byte [] row, - SortedMap candidateKeys) + private void rowAtOrBeforeFromMapFile(final MapFile.Reader map, + final byte [] row, final SortedMap candidateKeys, + final Set deletes) throws IOException { HStoreKey startKey = new HStoreKey(); ImmutableBytesWritable startValue = new ImmutableBytesWritable(); @@ -1491,9 +1471,9 @@ public class HStore implements HConstants { long now = System.currentTimeMillis(); // if there aren't any candidate keys yet, we'll do some things different if (candidateKeys.isEmpty()) { - rowAtOrBeforeCandidate(startKey, map, row, candidateKeys, now); + rowAtOrBeforeCandidate(startKey, map, row, candidateKeys, deletes, now); } else { - rowAtOrBeforeWithCandidates(startKey, map, row, candidateKeys, + rowAtOrBeforeWithCandidates(startKey, map, row, candidateKeys, deletes, now); } } @@ -1510,7 +1490,8 @@ public class HStore implements HConstants { */ private void rowAtOrBeforeCandidate(final HStoreKey startKey, final MapFile.Reader map, final byte[] row, - final SortedMap candidateKeys, final long now) + final SortedMap candidateKeys, + final Set deletes, final long now) throws IOException { // if the row we're looking for is past the end of this mapfile, set the // search key to be the last key. If its a deleted key, then we'll back @@ -1525,9 +1506,31 @@ public class HStore implements HConstants { searchKey = startKey; } } - rowAtOrBeforeCandidate(map, searchKey, candidateKeys, now); + rowAtOrBeforeCandidate(map, searchKey, candidateKeys, deletes, now); + } + + /* + * @param ttlSetting + * @param hsk + * @param now + * @param deletes + * @return True if key has not expired and is not in passed set of deletes. + */ + static boolean notExpiredAndNotInDeletes(final long ttl, + final HStoreKey hsk, final long now, final Set deletes) { + return !isExpired(hsk, ttl, now) && !deletes.contains(hsk); } + private static boolean isExpired(final HStoreKey hsk, final long ttl, + final long now) { + boolean result = ttl != HConstants.FOREVER && now > hsk.getTimestamp() + ttl; + if (LOG.isDebugEnabled()) { + LOG.debug("rowAtOrBeforeCandidate 1:" + hsk + + ": expired, skipped"); + } + return result; + } + /* Find a candidate for row that is at or before passed key, sk, in mapfile. * @param map * @param sk Key to go search the mapfile with. @@ -1538,7 +1541,7 @@ public class HStore implements HConstants { */ private void rowAtOrBeforeCandidate(final MapFile.Reader map, final HStoreKey sk, final SortedMap candidateKeys, - final long now) + final Set deletes, final long now) throws IOException { HStoreKey searchKey = sk; HStoreKey readkey = new HStoreKey(); @@ -1557,19 +1560,16 @@ public class HStore implements HConstants { // as a candidate key if (Bytes.equals(readkey.getRow(), searchKey.getRow())) { if (!HLogEdit.isDeleted(readval.get())) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) { candidateKeys.put(stripTimestamp(readkey), new Long(readkey.getTimestamp())); foundCandidate = true; + // NOTE! Continue. continue; } - if (LOG.isDebugEnabled()) { - LOG.debug("rowAtOrBeforeCandidate 1:" + readkey + - ": expired, skipped"); - } } // Deleted value. + deletes.add(readkey); if (deletedOrExpiredRow == null) { deletedOrExpiredRow = new HStoreKey(readkey); } @@ -1582,18 +1582,14 @@ public class HStore implements HConstants { // we're seeking yet, so this row is a candidate for closest // (assuming that it isn't a delete). if (!HLogEdit.isDeleted(readval.get())) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) { candidateKeys.put(stripTimestamp(readkey), new Long(readkey.getTimestamp())); foundCandidate = true; continue; } - if (LOG.isDebugEnabled()) { - LOG.debug("rowAtOrBeforeCandidate 2:" + readkey + - ": expired, skipped"); - } } + deletes.add(readkey); if (deletedOrExpiredRow == null) { deletedOrExpiredRow = new HStoreKey(readkey); } @@ -1619,7 +1615,8 @@ public class HStore implements HConstants { private void rowAtOrBeforeWithCandidates(final HStoreKey startKey, final MapFile.Reader map, final byte[] row, - final SortedMap candidateKeys, final long now) + final SortedMap candidateKeys, + final Set deletes, final long now) throws IOException { HStoreKey readkey = new HStoreKey(); ImmutableBytesWritable readval = new ImmutableBytesWritable(); @@ -1650,15 +1647,9 @@ public class HStore implements HConstants { if (Bytes.equals(readkey.getRow(), row)) { strippedKey = stripTimestamp(readkey); if (!HLogEdit.isDeleted(readval.get())) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) { candidateKeys.put(strippedKey, new Long(readkey.getTimestamp())); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("rowAtOrBeforeWithCandidates 1: " + readkey + - ": expired, skipped"); - } } } else { // If the candidate keys contain any that might match by timestamp, @@ -1682,14 +1673,8 @@ public class HStore implements HConstants { // we're seeking yet, so this row is a candidate for closest // (assuming that it isn't a delete). if (!HLogEdit.isDeleted(readval.get())) { - if (ttl == HConstants.FOREVER || - now < readkey.getTimestamp() + ttl) { + if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) { candidateKeys.put(strippedKey, Long.valueOf(readkey.getTimestamp())); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("rowAtOrBeforeWithCandidates 2: " + readkey + - ": expired, skipped"); - } } } else { // If the candidate keys contain any that might match by timestamp, @@ -1902,4 +1887,4 @@ public class HStore implements HConstants { return key; } } -} +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java index 253ba7043b4..1220b525d24 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.rmi.UnexpectedException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -327,24 +328,38 @@ class Memcache { * @param row Row to look for. * @param candidateKeys Map of candidate keys (Accumulation over lots of * lookup over stores and memcaches) + * @param deletes Deletes collected so far. */ - void getRowKeyAtOrBefore(final byte [] row, - SortedMap candidateKeys) { + void getRowKeyAtOrBefore(final byte [] row, + final SortedMap candidateKeys) { + getRowKeyAtOrBefore(row, candidateKeys, new HashSet()); + } + + /** + * @param row Row to look for. + * @param candidateKeys Map of candidate keys (Accumulation over lots of + * lookup over stores and memcaches) + * @param deletes Deletes collected so far. + */ + void getRowKeyAtOrBefore(final byte [] row, + final SortedMap candidateKeys, + final Set deletes) { this.lock.readLock().lock(); try { synchronized (memcache) { - internalGetRowKeyAtOrBefore(memcache, row, candidateKeys); + getRowKeyAtOrBefore(memcache, row, candidateKeys, deletes); } synchronized (snapshot) { - internalGetRowKeyAtOrBefore(snapshot, row, candidateKeys); + getRowKeyAtOrBefore(snapshot, row, candidateKeys, deletes); } } finally { this.lock.readLock().unlock(); } } - private void internalGetRowKeyAtOrBefore(SortedMap map, - byte [] row, SortedMap candidateKeys) { + private void getRowKeyAtOrBefore(final SortedMap map, + final byte [] row, final SortedMap candidateKeys, + final Set deletes) { // We want the earliest possible to start searching from. Start before // the candidate key in case it turns out a delete came in later. HStoreKey search_key = candidateKeys.isEmpty()? new HStoreKey(row): @@ -371,13 +386,12 @@ class Memcache { found_key = key_iterator.next(); if (Bytes.compareTo(found_key.getRow(), row) <= 0) { if (HLogEdit.isDeleted(tailMap.get(found_key))) { - handleDeleted(found_key, candidateKeys); + handleDeleted(found_key, candidateKeys, deletes); if (deletedOrExpiredRow == null) { deletedOrExpiredRow = found_key; } } else { - if (ttl == HConstants.FOREVER || - now < found_key.getTimestamp() + ttl) { + if (HStore.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) { HStoreKey strippedKey = stripTimestamp(found_key); candidateKeys.put(strippedKey, new Long(found_key.getTimestamp())); @@ -395,12 +409,13 @@ class Memcache { } } if (candidateKeys.isEmpty() && deletedOrExpiredRow != null) { - getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims, now); + getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims, + deletes, now); } } else { // The tail didn't contain any keys that matched our criteria, or was // empty. Examine all the keys that proceed our splitting point. - getRowKeyBefore(map, search_key, candidateKeys, victims, now); + getRowKeyBefore(map, search_key, candidateKeys, victims, deletes, now); } // Remove expired victims from the map. for (HStoreKey victim: victims) { @@ -419,7 +434,8 @@ class Memcache { */ private void getRowKeyBefore(SortedMap map, HStoreKey search_key, SortedMap candidateKeys, - List victims, final long now) { + final List expires, final Set deletes, + final long now) { SortedMap headMap = map.headMap(search_key); // If we tried to create a headMap and got an empty map, then there are // no keys at or before the search key, so we're done. @@ -429,35 +445,36 @@ class Memcache { // If there aren't any candidate keys at this point, we need to search // backwards until we find at least one candidate or run out of headMap. - HStoreKey found_key = null; if (candidateKeys.isEmpty()) { Set keys = headMap.keySet(); HStoreKey [] cells = keys.toArray(new HStoreKey[keys.size()]); byte [] lastRowFound = null; for (int i = cells.length - 1; i >= 0; i--) { - HStoreKey thisKey = cells[i]; + HStoreKey found_key = cells[i]; // if the last row we found a candidate key for is different than // the row of the current candidate, we can stop looking -- if its // not a delete record. - boolean deleted = HLogEdit.isDeleted(headMap.get(thisKey)); + boolean deleted = HLogEdit.isDeleted(headMap.get(found_key)); if (lastRowFound != null && - !Bytes.equals(lastRowFound, thisKey.getRow()) && !deleted) { + !Bytes.equals(lastRowFound, found_key.getRow()) && !deleted) { break; } // If this isn't a delete, record it as a candidate key. Also // take note of the row of this candidate so that we'll know when // we cross the row boundary into the previous row. if (!deleted) { - if (ttl == HConstants.FOREVER || now < thisKey.getTimestamp() + ttl) { - lastRowFound = thisKey.getRow(); - candidateKeys.put(stripTimestamp(thisKey), - new Long(thisKey.getTimestamp())); + if (HStore.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) { + lastRowFound = found_key.getRow(); + candidateKeys.put(stripTimestamp(found_key), + new Long(found_key.getTimestamp())); } else { - victims.add(found_key); + expires.add(found_key); if (LOG.isDebugEnabled()) { LOG.debug("getRowKeyBefore: " + found_key + ": expired, skipped"); } } + } else { + deletes.add(found_key); } } } else { @@ -469,16 +486,17 @@ class Memcache { headMap.tailMap(new HStoreKey(headMap.lastKey().getRow())); Iterator key_iterator = thisRowTailMap.keySet().iterator(); do { - found_key = key_iterator.next(); + HStoreKey found_key = key_iterator.next(); if (HLogEdit.isDeleted(thisRowTailMap.get(found_key))) { - handleDeleted(found_key, candidateKeys); + handleDeleted(found_key, candidateKeys, deletes); } else { if (ttl == HConstants.FOREVER || - now < found_key.getTimestamp() + ttl) { + now < found_key.getTimestamp() + ttl || + !deletes.contains(found_key)) { candidateKeys.put(stripTimestamp(found_key), Long.valueOf(found_key.getTimestamp())); } else { - victims.add(found_key); + expires.add(found_key); if (LOG.isDebugEnabled()) { LOG.debug("internalGetRowKeyAtOrBefore: " + found_key + ": expired, skipped"); @@ -490,7 +508,9 @@ class Memcache { } private void handleDeleted(final HStoreKey k, - final SortedMap candidateKeys) { + final SortedMap candidateKeys, + final Set deletes) { + deletes.add(k); HStoreKey strippedKey = stripTimestamp(k); if (candidateKeys.containsKey(strippedKey)) { long bestCandidateTs = diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java b/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java index 92103e98ccc..f70fccbda3d 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java @@ -91,7 +91,7 @@ public class TestGet2 extends HBaseTestCase implements HConstants { batchUpdate.delete(COLUMNS[0]); region.batchUpdate(batchUpdate); - results = region.getClosestRowBefore(Bytes.toBytes(T10)); + results = region.getClosestRowBefore(Bytes.toBytes(T20)); assertEquals(T10, new String(results.get(COLUMNS[0]).getValue())); batchUpdate = new BatchUpdate(T30);