HBASE-1577 Move memcache to ConcurrentSkipListMap from ConcurrentSkipListSet

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@788103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-06-24 18:19:31 +00:00
parent cc920fe6bb
commit 0f0a133abf
8 changed files with 156 additions and 176 deletions

View File

@ -395,6 +395,8 @@ Release 0.20.0 - Unreleased
when cluster working and not working (Jon Gray via Stack)
HBASE-1576 TIF needs to be able to set scanner caching size for smaller
row tables & performance
HBASE-1577 Move memcache to ConcurrentSkipListMap from
ConcurrentSkipListSet
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue

View File

@ -154,12 +154,10 @@ abstract class BaseScanner extends Chore implements HConstants {
int rows = 0;
try {
regionServer = master.connection.getHRegionConnection(region.getServer());
scannerId = regionServer.openScanner(region.getRegionName(),
new Scan().addFamily(HConstants.CATALOG_FAMILY));
new Scan().addFamily(HConstants.CATALOG_FAMILY));
while (true) {
Result values = regionServer.next(scannerId);
if (values == null || values.size() == 0) {
break;
}

View File

@ -235,7 +235,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// dont retry too much
conf.setInt("hbase.client.retries.number", 3);
this.connection = ServerConnectionManager.getConnection(conf);
this.metaRescanInterval =

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -312,8 +311,6 @@ public class HRegion implements HConstants { // , Writable{
}
}
// Play log if one. Delete when done.
doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
if (fs.exists(oldLogFile)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting old log file: " + oldLogFile);
@ -1479,13 +1476,6 @@ public class HRegion implements HConstants { // , Writable{
private boolean isFlushSize(final long size) {
return size > this.memcacheFlushSize;
}
// Do any reconstruction needed from the log
protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
Progressable reporter)
throws UnsupportedEncodingException, IOException {
// Nothing to do (Replaying is done in HStores)
}
protected Store instantiateHStore(Path baseDir,
HColumnDescriptor c, Path oldLogFile, Progressable reporter)
@ -1663,11 +1653,10 @@ public class HRegion implements HConstants { // , Writable{
* It is used to combine scanners from multiple Stores (aka column families).
*/
class RegionScanner implements InternalScanner {
private KeyValueHeap storeHeap;
private byte [] stopRow;
RegionScanner(Scan scan) throws IOException {
RegionScanner(Scan scan) {
if(Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
this.stopRow = null;
} else {
@ -1675,21 +1664,11 @@ public class HRegion implements HConstants { // , Writable{
}
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
try {
for(Map.Entry<byte[], NavigableSet<byte[]>> entry :
for(Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
scanners.add(store.getScanner(scan, entry.getValue()));
}
} catch (IOException e) {
for(KeyValueScanner scanner : scanners) {
if(scanner != null) {
close(scanner);
}
}
throw e;
Store store = stores.get(entry.getKey());
scanners.add(store.getScanner(scan, entry.getValue()));
}
this.storeHeap =
new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);

View File

@ -27,12 +27,12 @@ import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@ -56,16 +56,15 @@ class Memcache {
private final long ttl;
// Note that since these structures are always accessed with a lock held,
// no additional synchronization is required.
// The currently active sorted set of edits. Using explicit type because
// if I use NavigableSet, I lose some facility -- I can't get a NavigableSet
// when I do tailSet or headSet.
volatile ConcurrentSkipListSet<KeyValue> memcache;
// Memcache. Use a SkipListMap rather than SkipListSet because of the
// better semantics. The Map will overwrite if passed a key it already had
// whereas the Set will not add new KV if key is same though value might be
// different. Value is not important -- just make sure always same
// reference passed.
volatile ConcurrentSkipListMap<KeyValue, Object> memcache;
// Snapshot of memcache. Made for flusher.
volatile ConcurrentSkipListSet<KeyValue> snapshot;
volatile ConcurrentSkipListMap<KeyValue, Object> snapshot;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -79,7 +78,12 @@ class Memcache {
// TODO: Fix this guess by studying jprofiler
private final static int ESTIMATED_KV_HEAP_TAX = 60;
/* Value we add memstore 'value'. Memstore backing is a Map
* but we are only interested in its keys.
*/
private static final Object NULL = new Object();
/**
* Default constructor. Used for tests.
*/
@ -98,27 +102,27 @@ class Memcache {
this.comparatorIgnoreTimestamp =
this.comparator.getComparatorIgnoringTimestamps();
this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
this.memcache = createSet(c);
this.snapshot = createSet(c);
this.memcache = createMap(c);
this.snapshot = createMap(c);
}
static ConcurrentSkipListSet<KeyValue> createSet(final KeyValue.KVComparator c) {
return new ConcurrentSkipListSet<KeyValue>(c);
static ConcurrentSkipListMap<KeyValue, Object> createMap(final KeyValue.KVComparator c) {
return new ConcurrentSkipListMap<KeyValue, Object>(c);
}
void dump() {
for (KeyValue kv: this.memcache) {
LOG.info(kv);
for (Map.Entry<KeyValue, ?> entry: this.memcache.entrySet()) {
LOG.info(entry.getKey());
}
for (KeyValue kv: this.snapshot) {
LOG.info(kv);
for (Map.Entry<KeyValue, ?> entry: this.snapshot.entrySet()) {
LOG.info(entry.getKey());
}
}
/**
* Creates a snapshot of the current Memcache.
* Snapshot must be cleared by call to {@link #clearSnapshot(SortedMap)}
* To get the snapshot made by this method, use {@link #getSnapshot}.
* Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)}
* To get the snapshot made by this method, use {@link #getSnapshot()}
*/
void snapshot() {
this.lock.writeLock().lock();
@ -134,7 +138,7 @@ class Memcache {
// mistake. St.Ack
if (!this.memcache.isEmpty()) {
this.snapshot = this.memcache;
this.memcache = createSet(this.comparator);
this.memcache = createMap(this.comparator);
}
}
} finally {
@ -145,12 +149,12 @@ class Memcache {
/**
* Return the current snapshot.
* Called by flusher to get current snapshot made by a previous
* call to {@link snapshot}.
* call to {@link #snapshot()}
* @return Return snapshot.
* @see {@link #snapshot()}
* @see {@link #clearSnapshot(NavigableSet)}
* @see {@link #clearSnapshot(java.util.Map)}
*/
ConcurrentSkipListSet<KeyValue> getSnapshot() {
ConcurrentSkipListMap<KeyValue, ?> getSnapshot() {
return this.snapshot;
}
@ -160,7 +164,7 @@ class Memcache {
* @throws UnexpectedException
* @see {@link #snapshot()}
*/
void clearSnapshot(final Set<KeyValue> ss)
void clearSnapshot(final Map<KeyValue, ?> ss)
throws UnexpectedException {
this.lock.writeLock().lock();
try {
@ -171,7 +175,7 @@ class Memcache {
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = createSet(this.comparator);
this.snapshot = createMap(this.comparator);
}
} finally {
this.lock.writeLock().unlock();
@ -187,13 +191,9 @@ class Memcache {
long size = -1;
this.lock.readLock().lock();
try {
boolean notpresent = this.memcache.add(kv);
// if false then memcache is not changed (look memcache.add(kv) docs)
// need to remove kv and add again to replace it
if (!notpresent && this.memcache.remove(kv)) {
this.memcache.add(kv);
}
size = heapSize(kv, notpresent);
// Add anything as value as long as same instance each time.
size = heapSize(kv,
this.memcache.put(kv, NULL) == null);
} finally {
this.lock.readLock().unlock();
}
@ -221,7 +221,7 @@ class Memcache {
try {
boolean notpresent = false;
List<KeyValue> deletes = new ArrayList<KeyValue>();
SortedSet<KeyValue> tailSet = this.memcache.tailSet(delete);
SortedMap<KeyValue, Object> tail = this.memcache.tailMap(delete);
//Parse the delete, so that it is only done once
byte [] deleteBuffer = delete.getBuffer();
@ -251,28 +251,28 @@ class Memcache {
byte deleteType = deleteBuffer[deleteOffset];
//Comparing with tail from memcache
for (KeyValue mem : tailSet) {
DeleteCode res = DeleteCompare.deleteCompare(mem, deleteBuffer,
for (Map.Entry<KeyValue, ?> entry : tail.entrySet()) {
DeleteCode res = DeleteCompare.deleteCompare(entry.getKey(),
deleteBuffer,
deleteRowOffset, deleteRowLen, deleteQualifierOffset,
deleteQualifierLen, deleteTimestampOffset, deleteType,
comparator.getRawComparator());
if (res == DeleteCode.DONE) {
break;
} else if (res == DeleteCode.DELETE) {
deletes.add(mem);
deletes.add(entry.getKey());
} // SKIP
}
//Delete all the entries effected by the last added delete
for(KeyValue del : deletes) {
notpresent = this.memcache.remove(del);
notpresent = this.memcache.remove(del) == null;
size -= heapSize(del, notpresent);
}
//Adding the delete to memcache
notpresent = this.memcache.add(delete);
size += heapSize(delete, notpresent);
// Adding the delete to memcache. Add any value, as long as
// same instance each time.
size += heapSize(delete, this.memcache.put(delete, NULL) == null);
} finally {
this.lock.readLock().unlock();
}
@ -326,21 +326,21 @@ class Memcache {
/*
* @param kv Find row that follows this one. If null, return first.
* @param set Set to look in for a row beyond <code>row</code>.
* @param map Set to look in for a row beyond <code>row</code>.
* @return Next row or null if none found. If one found, will be a new
* KeyValue -- can be destroyed by subsequent calls to this method.
*/
private KeyValue getNextRow(final KeyValue kv,
final NavigableSet<KeyValue> set) {
final NavigableMap<KeyValue, ?> map) {
KeyValue result = null;
SortedSet<KeyValue> tailset = kv == null? set: set.tailSet(kv);
SortedMap<KeyValue, ?> tail = kv == null? map: map.tailMap(kv);
// Iterate until we fall into the next row; i.e. move off current row
for (KeyValue i : tailset) {
if (comparator.compareRows(i, kv) <= 0)
for (Map.Entry<KeyValue, ?> i : tail.entrySet()) {
if (comparator.compareRows(i.getKey(), kv) <= 0)
continue;
// Note: Not suppressing deletes or expired cells. Needs to be handled
// by higher up functions.
result = i;
result = i.getKey();
break;
}
return result;
@ -365,6 +365,7 @@ class Memcache {
* ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
* with a different Type to the candidate key.
* @param deletes Pass a Set that has a Comparator that ignores key type.
* @param now
*/
void getRowKeyAtOrBefore(final KeyValue kv,
final NavigableSet<KeyValue> candidates,
@ -378,10 +379,10 @@ class Memcache {
}
}
private void getRowKeyAtOrBefore(final ConcurrentSkipListSet<KeyValue> set,
private void getRowKeyAtOrBefore(final ConcurrentSkipListMap<KeyValue, Object> map,
final KeyValue kv, final NavigableSet<KeyValue> candidates,
final NavigableSet<KeyValue> deletes, final long now) {
if (set.isEmpty()) {
if (map.isEmpty()) {
return;
}
// We want the earliest possible to start searching from. Start before
@ -389,21 +390,22 @@ class Memcache {
KeyValue search = candidates.isEmpty()? kv: candidates.first();
// Get all the entries that come equal or after our search key
SortedSet<KeyValue> tailset = set.tailSet(search);
SortedMap<KeyValue, Object> tail = map.tailMap(search);
// if there are items in the tail map, there's either a direct match to
// the search key, or a range of values between the first candidate key
// and the ultimate search key (or the end of the cache)
if (!tailset.isEmpty() &&
this.comparator.compareRows(tailset.first(), search) <= 0) {
if (!tail.isEmpty() &&
this.comparator.compareRows(tail.firstKey(), search) <= 0) {
// Keep looking at cells as long as they are no greater than the
// ultimate search key and there's still records left in the map.
KeyValue deleted = null;
KeyValue found = null;
for (Iterator<KeyValue> iterator = tailset.iterator();
for (Iterator<Map.Entry<KeyValue, Object>> iterator =
tail.entrySet().iterator();
iterator.hasNext() && (found == null ||
this.comparator.compareRows(found, kv) <= 0);) {
found = iterator.next();
found = iterator.next().getKey();
if (this.comparator.compareRows(found, kv) <= 0) {
if (found.isDeleteType()) {
Store.handleDeletes(found, candidates, deletes);
@ -425,12 +427,12 @@ class Memcache {
}
}
if (candidates.isEmpty() && deleted != null) {
getRowKeyBefore(set, deleted, candidates, deletes, now);
getRowKeyBefore(map, deleted, candidates, deletes, now);
}
} else {
// The tail didn't contain any keys that matched our criteria, or was
// empty. Examine all the keys that proceed our splitting point.
getRowKeyBefore(set, search, candidates, deletes, now);
getRowKeyBefore(map, search, candidates, deletes, now);
}
}
@ -438,19 +440,19 @@ class Memcache {
* Get row key that comes before passed <code>search_key</code>
* Use when we know search_key is not in the map and we need to search
* earlier in the cache.
* @param set
* @param map
* @param search
* @param candidates
* @param deletes Pass a Set that has a Comparator that ignores key type.
* @param now
*/
private void getRowKeyBefore(ConcurrentSkipListSet<KeyValue> set,
private void getRowKeyBefore(ConcurrentSkipListMap<KeyValue, Object> map,
KeyValue search, NavigableSet<KeyValue> candidates,
final NavigableSet<KeyValue> deletes, final long now) {
NavigableSet<KeyValue> headSet = set.headSet(search);
NavigableMap<KeyValue, Object> headMap = map.headMap(search);
// If we tried to create a headMap and got an empty map, then there are
// no keys at or before the search key, so we're done.
if (headSet.isEmpty()) {
if (headMap.isEmpty()) {
return;
}
@ -458,7 +460,9 @@ class Memcache {
// backwards until we find at least one candidate or run out of headMap.
if (candidates.isEmpty()) {
KeyValue lastFound = null;
for (Iterator<KeyValue> i = headSet.descendingIterator(); i.hasNext();) {
// TODO: Confirm we're iterating in the right order
for (Iterator<KeyValue> i = headMap.descendingKeySet().iterator();
i.hasNext();) {
KeyValue found = i.next();
// if the last row we found a candidate key for is different than
// the row of the current candidate, we can stop looking -- if its
@ -477,14 +481,14 @@ class Memcache {
candidates.add(found);
} else {
// Its expired.
Store.expiredOrDeleted(set, found);
Store.expiredOrDeleted(map, found);
}
} else {
// We are encountering items in reverse. We may have just added
// an item to candidates that this later item deletes. Check. If we
// found something in candidates, remove it from the set.
if (Store.handleDeletes(found, candidates, deletes)) {
remove(set, found);
remove(map, found);
}
}
}
@ -493,11 +497,11 @@ class Memcache {
// the very last row's worth of keys in the headMap, because any
// smaller acceptable candidate keys would have caused us to start
// our search earlier in the list, and we wouldn't be searching here.
SortedSet<KeyValue> rowTailMap =
headSet.tailSet(headSet.last().cloneRow(HConstants.LATEST_TIMESTAMP));
Iterator<KeyValue> i = rowTailMap.iterator();
SortedMap<KeyValue, Object> rowTailMap =
headMap.tailMap(headMap.lastKey().cloneRow(HConstants.LATEST_TIMESTAMP));
Iterator<Map.Entry<KeyValue, Object>> i = rowTailMap.entrySet().iterator();
do {
KeyValue found = i.next();
KeyValue found = i.next().getKey();
if (found.isDeleteType()) {
Store.handleDeletes(found, candidates, deletes);
} else {
@ -506,7 +510,7 @@ class Memcache {
!deletes.contains(found)) {
candidates.add(found);
} else {
Store.expiredOrDeleted(set, found);
Store.expiredOrDeleted(map, found);
}
}
} while (i.hasNext());
@ -515,21 +519,22 @@ class Memcache {
/*
* @param set
* @param map
* @param kv This is a delete record. Remove anything behind this of same
* r/c/ts.
* @return True if we removed anything.
*/
private boolean remove(final NavigableSet<KeyValue> set, final KeyValue kv) {
SortedSet<KeyValue> s = set.tailSet(kv);
if (s.isEmpty()) {
private boolean remove(final NavigableMap<KeyValue, Object> map,
final KeyValue kv) {
SortedMap<KeyValue, Object> m = map.tailMap(kv);
if (m.isEmpty()) {
return false;
}
boolean removed = false;
for (KeyValue k: s) {
if (this.comparatorIgnoreType.compare(k, kv) == 0) {
for (Map.Entry<KeyValue, Object> entry: m.entrySet()) {
if (this.comparatorIgnoreType.compare(entry.getKey(), kv) == 0) {
// Same r/c/ts. Remove it.
s.remove(k);
m.remove(entry.getKey());
removed = true;
continue;
}
@ -565,7 +570,6 @@ class Memcache {
* and it is not necessary to check any storefiles after this.
* <p>
* Otherwise, it will return false and you should continue on.
* @param startKey Starting KeyValue
* @param matcher Column matcher
* @param result List to add results to
* @return true if done with store (early-out), false if not
@ -579,10 +583,7 @@ class Memcache {
return true;
}
matcher.update();
if(internalGet(this.snapshot, matcher, result) || matcher.isDone()) {
return true;
}
return false;
return internalGet(this.snapshot, matcher, result) || matcher.isDone();
} finally {
this.lock.readLock().unlock();
}
@ -590,23 +591,23 @@ class Memcache {
/**
*
* @param set memcache or snapshot
* @param map memcache or snapshot
* @param matcher query matcher
* @param result list to add results to
* @return true if done with store (early-out), false if not
* @throws IOException
*/
private boolean internalGet(SortedSet<KeyValue> set, QueryMatcher matcher,
List<KeyValue> result) throws IOException {
if(set.isEmpty()) return false;
private boolean internalGet(SortedMap<KeyValue, Object> map, QueryMatcher matcher,
List<KeyValue> result)
throws IOException {
if(map.isEmpty()) return false;
// Seek to startKey
SortedSet<KeyValue> tailSet = set.tailSet(matcher.getStartKey());
for (KeyValue kv : tailSet) {
QueryMatcher.MatchCode res = matcher.match(kv);
SortedMap<KeyValue, Object> tail = map.tailMap(matcher.getStartKey());
for (Map.Entry<KeyValue, Object> entry : tail.entrySet()) {
QueryMatcher.MatchCode res = matcher.match(entry.getKey());
switch(res) {
case INCLUDE:
result.add(kv);
result.add(entry.getKey());
break;
case SKIP:
break;
@ -629,12 +630,12 @@ class Memcache {
* in the passed memcache tree.
*/
protected class MemcacheScanner implements KeyValueScanner {
private final NavigableSet<KeyValue> mc;
private final NavigableMap<KeyValue, Object> mc;
private KeyValue current = null;
private List<KeyValue> result = new ArrayList<KeyValue>();
private int idx = 0;
MemcacheScanner(final NavigableSet<KeyValue> mc) {
MemcacheScanner(final NavigableMap<KeyValue, Object> mc) {
this.mc = mc;
}
@ -677,9 +678,9 @@ class Memcache {
* next row.
*/
boolean cacheNextRow() {
SortedSet<KeyValue> keys;
SortedMap<KeyValue, Object> keys;
try {
keys = this.mc.tailSet(this.current);
keys = this.mc.tailMap(this.current);
} catch (Exception e) {
close();
return false;
@ -689,13 +690,14 @@ class Memcache {
return false;
}
this.current = null;
byte [] row = keys.first().getRow();
for (KeyValue key: keys) {
if (comparator.compareRows(key, row) != 0) {
this.current = key;
byte [] row = keys.firstKey().getRow();
for (Map.Entry<KeyValue, Object> key: keys.entrySet()) {
KeyValue kv = key.getKey();
if (comparator.compareRows(kv, row) != 0) {
this.current = kv;
break;
}
result.add(key);
result.add(kv);
}
return true;
}
@ -715,8 +717,6 @@ class Memcache {
* dump and bring up resultant hprof in something like jprofiler which
* allows you get 'deep size' on objects.
* @param args
* @throws InterruptedException
* @throws IOException
*/
public static void main(String [] args) {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
@ -751,4 +751,4 @@ class Memcache {
}
LOG.info("Exiting.");
}
}
}

View File

@ -32,7 +32,6 @@ import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -131,7 +130,6 @@ public class Store implements HConstants {
private final int compactionThreshold;
private final int blocksize;
private final boolean blockcache;
private final boolean bloomfilter;
private final Compression.Algorithm compression;
// Comparing KeyValues
@ -162,7 +160,6 @@ public class Store implements HConstants {
this.family = family;
this.fs = fs;
this.conf = conf;
this.bloomfilter = family.isBloomfilter();
this.blockcache = family.isBlockCacheEnabled();
this.blocksize = family.getBlocksize();
this.compression = family.getCompression();
@ -290,8 +287,9 @@ public class Store implements HConstants {
// TODO: This could grow large and blow heap out. Need to get it into
// general memory usage accounting.
long maxSeqIdInLog = -1;
ConcurrentSkipListSet<KeyValue> reconstructedCache =
Memcache.createSet(this.comparator);
// TODO: Move this memstoring over into MemStore.
ConcurrentSkipListMap<KeyValue, Object> reconstructedCache =
Memcache.createMap(this.comparator);
SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
reconstructionLog, this.conf);
try {
@ -316,7 +314,8 @@ public class Store implements HConstants {
!val.matchingFamily(family.getName())) {
continue;
}
reconstructedCache.add(val);
// Add anything as value as long as we use same instance each time.
reconstructedCache.put(val, Boolean.TRUE);
editsCount++;
// Every 2k edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
@ -467,7 +466,7 @@ public class Store implements HConstants {
boolean flushCache(final long logCacheFlushId) throws IOException {
// Get the snapshot to flush. Presumes that a call to
// this.memcache.snapshot() has happened earlier up in the chain.
ConcurrentSkipListSet<KeyValue> cache = this.memcache.getSnapshot();
ConcurrentSkipListMap<KeyValue, ?> cache = this.memcache.getSnapshot();
// If an exception happens flushing, we let it out without clearing
// the memcache snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
@ -487,7 +486,7 @@ public class Store implements HConstants {
* @return StoreFile created.
* @throws IOException
*/
private StoreFile internalFlushCache(final ConcurrentSkipListSet<KeyValue> cache,
private StoreFile internalFlushCache(final ConcurrentSkipListMap<KeyValue, ?> cache,
final long logCacheFlushId)
throws IOException {
HFile.Writer writer = null;
@ -505,7 +504,8 @@ public class Store implements HConstants {
writer = getWriter();
int entries = 0;
try {
for (KeyValue kv: cache) {
for (Map.Entry<KeyValue, ?> entry: cache.entrySet()) {
KeyValue kv = entry.getKey();
if (!isExpired(kv, oldestTimestamp)) {
writer.append(kv);
entries++;
@ -559,7 +559,7 @@ public class Store implements HConstants {
* @return Count of store files.
*/
private int updateStorefiles(final long logCacheFlushId,
final StoreFile sf, final NavigableSet<KeyValue> cache)
final StoreFile sf, final NavigableMap<KeyValue, ?> cache)
throws IOException {
int count = 0;
this.lock.writeLock().lock();
@ -974,8 +974,8 @@ public class Store implements HConstants {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
boolean b = set.remove(kv);
static void expiredOrDeleted(final Map<KeyValue, Object> set, final KeyValue kv) {
boolean b = set.remove(kv) != null;
if (LOG.isDebugEnabled()) {
LOG.debug(kv.toString() + " expired: " + b);
}
@ -1343,8 +1343,7 @@ public class Store implements HConstants {
* Return a scanner for both the memcache and the HStore files
*/
protected KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols)
throws IOException {
final NavigableSet<byte []> targetCols) {
lock.readLock().lock();
try {
return new StoreScanner(this, scan, targetCols);
@ -1507,13 +1506,13 @@ public class Store implements HConstants {
/**
* Increments the value for the given row/family/qualifier
* @param row
* @param family
* @param f
* @param qualifier
* @param amount
* @return The new value.
* @throws IOException
*/
public ValueAndSize incrementColumnValue(byte [] row, byte [] family,
public ValueAndSize incrementColumnValue(byte [] row, byte [] f,
byte [] qualifier, long amount) throws IOException {
long value = 0;
List<KeyValue> result = new ArrayList<KeyValue>();
@ -1524,7 +1523,7 @@ public class Store implements HConstants {
NavigableSet<byte[]> qualifiers =
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qualifier);
QueryMatcher matcher = new QueryMatcher(get, family, qualifiers, this.ttl,
QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
keyComparator, 1);
// Read from Memcache
@ -1540,7 +1539,7 @@ public class Store implements HConstants {
}
// Check if we even have storefiles
if(this.storefiles.isEmpty()) {
return addNewKeyValue(row, family, qualifier, value, amount);
return addNewKeyValue(row, f, qualifier, value, amount);
}
// Get storefiles for this store
@ -1557,16 +1556,16 @@ public class Store implements HConstants {
if(result.size() > 0) {
value = Bytes.toLong(result.get(0).getValue());
}
return addNewKeyValue(row, family, qualifier, value, amount);
return addNewKeyValue(row, f, qualifier, value, amount);
}
private ValueAndSize addNewKeyValue(byte [] row, byte [] family, byte [] qualifier,
private ValueAndSize addNewKeyValue(byte [] row, byte [] f, byte [] qualifier,
long value, long amount) {
long newValue = value + amount;
KeyValue newKv = new KeyValue(row, family, qualifier,
KeyValue newKv = new KeyValue(row, f, qualifier,
System.currentTimeMillis(),
Bytes.toBytes(newValue));
add(newKv);
return new ValueAndSize(newValue, newKv.heapSize());
}
}
}

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Iterator;
@ -414,10 +415,12 @@ public class TestHRegion extends HBaseTestCase {
// extract the key values out the memcache:
// This is kinda hacky, but better than nothing...
long now = System.currentTimeMillis();
KeyValue firstKv = region.getStore(fam1).memcache.memcache.first();
KeyValue firstKv = region.getStore(fam1).memcache.memcache.firstKey();
assertTrue(firstKv.getTimestamp() <= now);
now = firstKv.getTimestamp();
for (KeyValue kv : region.getStore(fam1).memcache.memcache) {
for (Map.Entry<KeyValue, ?> entry:
region.getStore(fam1).memcache.memcache.entrySet()) {
KeyValue kv = entry.getKey();
assertTrue(kv.getTimestamp() <= now);
now = kv.getTimestamp();
}

View File

@ -23,8 +23,8 @@ import java.io.IOException;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import junit.framework.TestCase;
@ -108,7 +108,7 @@ public class TestMemcache extends TestCase {
for (int i = 0; i < snapshotCount; i++) {
addRows(this.memcache);
runSnapshot(this.memcache);
Set<KeyValue> ss = this.memcache.getSnapshot();
Map<KeyValue, ?> ss = this.memcache.getSnapshot();
assertEquals("History not being cleared", 0, ss.size());
}
}
@ -153,9 +153,9 @@ public class TestMemcache extends TestCase {
// System.out.println(key);
}
int index = start;
for (KeyValue kv: mc.memcache) {
System.out.println(kv);
byte [] b = kv.getRow();
for (Map.Entry<KeyValue, ?> entry: mc.memcache.entrySet()) {
System.out.println(entry);
byte [] b = entry.getKey().getRow();
// Hardcoded offsets into String
String str = Bytes.toString(b, 13, 4);
byte [] bb = Bytes.toBytes(index);
@ -454,9 +454,9 @@ public class TestMemcache extends TestCase {
expected.add(put1);
assertEquals(3, memcache.memcache.size());
int i=0;
for(KeyValue actual : memcache.memcache) {
assertEquals(expected.get(i++), actual);
int i = 0;
for(Map.Entry<KeyValue, ?> entry : memcache.memcache.entrySet()) {
assertEquals(expected.get(i++), entry.getKey());
}
}
@ -487,9 +487,9 @@ public class TestMemcache extends TestCase {
expected.add(del2);
assertEquals(2, memcache.memcache.size());
int i=0;
for(KeyValue actual : memcache.memcache) {
assertEquals(expected.get(i++), actual);
int i = 0;
for(Map.Entry<KeyValue, ?> entry : memcache.memcache.entrySet()) {
assertEquals(expected.get(i++), entry.getKey());
}
}
@ -522,9 +522,9 @@ public class TestMemcache extends TestCase {
expected.add(put4);
assertEquals(2, memcache.memcache.size());
int i=0;
for(KeyValue actual : memcache.memcache) {
assertEquals(expected.get(i++), actual);
int i = 0;
for(Map.Entry<KeyValue, ?> entry : memcache.memcache.entrySet()) {
assertEquals(expected.get(i++), entry.getKey());
}
}
@ -538,7 +538,7 @@ public class TestMemcache extends TestCase {
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memcache.delete(delete);
assertEquals(1, memcache.memcache.size());
assertEquals(delete, memcache.memcache.first());
assertEquals(delete, memcache.memcache.firstKey());
}
public void testRetainsDeleteVersion() throws IOException {
@ -551,7 +551,7 @@ public class TestMemcache extends TestCase {
memcache.delete(delete);
assertEquals(1, memcache.memcache.size());
assertEquals(delete, memcache.memcache.first());
assertEquals(delete, memcache.memcache.firstKey());
}
public void testRetainsDeleteColumn() throws IOException {
// add a put to memcache
@ -563,7 +563,7 @@ public class TestMemcache extends TestCase {
memcache.delete(delete);
assertEquals(1, memcache.memcache.size());
assertEquals(delete, memcache.memcache.first());
assertEquals(delete, memcache.memcache.firstKey());
}
public void testRetainsDeleteFamily() throws IOException {
// add a put to memcache
@ -575,7 +575,7 @@ public class TestMemcache extends TestCase {
memcache.delete(delete);
assertEquals(1, memcache.memcache.size());
assertEquals(delete, memcache.memcache.first());
assertEquals(delete, memcache.memcache.firstKey());
}
@ -609,7 +609,7 @@ public class TestMemcache extends TestCase {
// Save off old state.
int oldHistorySize = hmc.getSnapshot().size();
hmc.snapshot();
Set<KeyValue> ss = hmc.getSnapshot();
Map<KeyValue, ?> ss = hmc.getSnapshot();
// Make some assertions about what just happened.
assertTrue("History size has not increased", oldHistorySize < ss.size());
hmc.clearSnapshot(ss);