HBASE-1577 Move memcache to ConcurrentSkipListMap from ConcurrentSkipListSet; second attempt

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@788515 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-06-25 21:22:14 +00:00
parent 7fb19786ee
commit 6ab29a6c47
11 changed files with 501 additions and 152 deletions

View File

@ -1717,7 +1717,7 @@ public class HTable {
for (BatchOperation bo: batchUpdate) {
if (!bo.isPut()) throw new IOException("Only Puts in BU as of 0.20.0");
Put p = new Put(batchUpdate.getRow(), rl);
p.add(bo.getColumn(),batchUpdate.getTimestamp(), bo.getValue());
p.add(bo.getColumn(), batchUpdate.getTimestamp(), bo.getValue());
put(p);
}
}

View File

@ -2266,10 +2266,8 @@ public class HRegion implements HConstants { // , Writable{
long result = 0L;
try {
Store store = stores.get(family);
Store.ValueAndSize vas =
store.incrementColumnValue(row, family, qualifier, amount);
store.incrementColumnValue(row, family, qualifier, amount);
result = vas.value;
long size = this.memstoreSize.addAndGet(vas.sizeAdded);
flush = isFlushSize(size);

View File

@ -1993,7 +1993,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
*/
Integer getLockFromId(long lockId)
throws IOException {
if(lockId == -1L) {
if (lockId == -1L) {
return null;
}
String lockName = String.valueOf(lockId);

View File

@ -0,0 +1,204 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.hbase.KeyValue;
/**
* A {@link java.util.Set} of {@link KeyValue}s implemented on top of a
* {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a
* {@link java.util.concurrent.ConcurrentSkipListSet} in all but one regard:
* An add will overwrite if already an entry for the added key. In other words,
* where CSLS does "Adds the specified element to this set if it is not already
* present.", this implementation "Adds the specified element to this set EVEN
* if it is already present overwriting what was there previous". The call to
* add returns true if no value in the backing map or false if there was an
* entry with same key (though value may be different).
* <p>Otherwise,
* has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
* get and set and won't throw ConcurrentModificationException when iterating.
*/
class KeyValueSkipListSet implements NavigableSet<KeyValue> {
private final ConcurrentNavigableMap<KeyValue, KeyValue> delegatee;
KeyValueSkipListSet(final KeyValue.KVComparator c) {
this.delegatee = new ConcurrentSkipListMap<KeyValue, KeyValue>(c);
}
KeyValueSkipListSet(final ConcurrentNavigableMap<KeyValue, KeyValue> m) {
this.delegatee = m;
}
/**
* Iterator that maps Iterator calls to return the value component of the
* passed-in Map.Entry Iterator.
*/
static class MapEntryIterator implements Iterator<KeyValue> {
private final Iterator<Map.Entry<KeyValue, KeyValue>> iterator;
MapEntryIterator(final Iterator<Map.Entry<KeyValue, KeyValue>> i) {
this.iterator = i;
}
public boolean hasNext() {
return this.iterator.hasNext();
}
public KeyValue next() {
return this.iterator.next().getValue();
}
public void remove() {
this.iterator.remove();
}
}
public KeyValue ceiling(KeyValue e) {
throw new UnsupportedOperationException("Not implemented");
}
public Iterator<KeyValue> descendingIterator() {
return new MapEntryIterator(this.delegatee.descendingMap().entrySet().
iterator());
}
public NavigableSet<KeyValue> descendingSet() {
throw new UnsupportedOperationException("Not implemented");
}
public KeyValue floor(KeyValue e) {
throw new UnsupportedOperationException("Not implemented");
}
public SortedSet<KeyValue> headSet(final KeyValue toElement) {
return headSet(toElement, false);
}
public NavigableSet<KeyValue> headSet(final KeyValue toElement,
boolean inclusive) {
return new KeyValueSkipListSet(this.delegatee.headMap(toElement, inclusive));
}
public KeyValue higher(KeyValue e) {
throw new UnsupportedOperationException("Not implemented");
}
public Iterator<KeyValue> iterator() {
return new MapEntryIterator(this.delegatee.entrySet().iterator());
}
public KeyValue lower(KeyValue e) {
throw new UnsupportedOperationException("Not implemented");
}
public KeyValue pollFirst() {
throw new UnsupportedOperationException("Not implemented");
}
public KeyValue pollLast() {
throw new UnsupportedOperationException("Not implemented");
}
public SortedSet<KeyValue> subSet(KeyValue fromElement, KeyValue toElement) {
throw new UnsupportedOperationException("Not implemented");
}
public NavigableSet<KeyValue> subSet(KeyValue fromElement,
boolean fromInclusive, KeyValue toElement, boolean toInclusive) {
throw new UnsupportedOperationException("Not implemented");
}
public SortedSet<KeyValue> tailSet(KeyValue fromElement) {
return tailSet(fromElement, true);
}
public NavigableSet<KeyValue> tailSet(KeyValue fromElement, boolean inclusive) {
return new KeyValueSkipListSet(this.delegatee.tailMap(fromElement, inclusive));
}
public Comparator<? super KeyValue> comparator() {
throw new UnsupportedOperationException("Not implemented");
}
public KeyValue first() {
return this.delegatee.get(this.delegatee.firstKey());
}
public KeyValue last() {
return this.delegatee.get(this.delegatee.lastKey());
}
public boolean add(KeyValue e) {
return this.delegatee.put(e, e) == null;
}
public boolean addAll(Collection<? extends KeyValue> c) {
throw new UnsupportedOperationException("Not implemented");
}
public void clear() {
this.delegatee.clear();
}
public boolean contains(Object o) {
return this.delegatee.containsKey(o);
}
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException("Not implemented");
}
public boolean isEmpty() {
return this.delegatee.isEmpty();
}
public boolean remove(Object o) {
return this.delegatee.remove(o) != null;
}
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("Not implemented");
}
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("Not implemented");
}
public int size() {
return this.delegatee.size();
}
public Object[] toArray() {
throw new UnsupportedOperationException("Not implemented");
}
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("Not implemented");
}
}

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -50,6 +51,8 @@ import org.apache.hadoop.hbase.util.Bytes;
* this point we let the snapshot go.
* TODO: Adjust size of the memstore when we remove items because they have
* been deleted.
* TODO: With new KVSLS, need to make sure we update HeapSize with difference
* in KV size.
*/
class MemStore {
private static final Log LOG = LogFactory.getLog(MemStore.class);
@ -61,10 +64,10 @@ class MemStore {
// whereas the Set will not add new KV if key is same though value might be
// different. Value is not important -- just make sure always same
// reference passed.
volatile ConcurrentSkipListMap<KeyValue, Object> memstore;
volatile KeyValueSkipListSet kvset;
// Snapshot of memstore. Made for flusher.
volatile ConcurrentSkipListMap<KeyValue, Object> snapshot;
volatile KeyValueSkipListSet snapshot;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -79,11 +82,6 @@ class MemStore {
// TODO: Fix this guess by studying jprofiler
private final static int ESTIMATED_KV_HEAP_TAX = 60;
/* Value we add memstore 'value'. Memstore backing is a Map
* but we are only interested in its keys.
*/
private static final Object NULL = new Object();
/**
* Default constructor. Used for tests.
*/
@ -102,20 +100,16 @@ class MemStore {
this.comparatorIgnoreTimestamp =
this.comparator.getComparatorIgnoringTimestamps();
this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
this.memstore = createMap(c);
this.snapshot = createMap(c);
}
static ConcurrentSkipListMap<KeyValue, Object> createMap(final KeyValue.KVComparator c) {
return new ConcurrentSkipListMap<KeyValue, Object>(c);
this.kvset = new KeyValueSkipListSet(c);
this.snapshot = new KeyValueSkipListSet(c);
}
void dump() {
for (Map.Entry<KeyValue, ?> entry: this.memstore.entrySet()) {
LOG.info(entry.getKey());
for (KeyValue kv: this.kvset) {
LOG.info(kv);
}
for (Map.Entry<KeyValue, ?> entry: this.snapshot.entrySet()) {
LOG.info(entry.getKey());
for (KeyValue kv: this.snapshot) {
LOG.info(kv);
}
}
@ -136,9 +130,9 @@ class MemStore {
// We used to synchronize on the memstore here but we're inside a
// write lock so removed it. Comment is left in case removal was a
// mistake. St.Ack
if (!this.memstore.isEmpty()) {
this.snapshot = this.memstore;
this.memstore = createMap(this.comparator);
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
}
}
} finally {
@ -154,7 +148,7 @@ class MemStore {
* @see {@link #snapshot()}
* @see {@link #clearSnapshot(java.util.Map)}
*/
ConcurrentSkipListMap<KeyValue, ?> getSnapshot() {
KeyValueSkipListSet getSnapshot() {
return this.snapshot;
}
@ -164,7 +158,7 @@ class MemStore {
* @throws UnexpectedException
* @see {@link #snapshot()}
*/
void clearSnapshot(final Map<KeyValue, ?> ss)
void clearSnapshot(final KeyValueSkipListSet ss)
throws UnexpectedException {
this.lock.writeLock().lock();
try {
@ -175,7 +169,7 @@ class MemStore {
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = createMap(this.comparator);
this.snapshot = new KeyValueSkipListSet(this.comparator);
}
} finally {
this.lock.writeLock().unlock();
@ -191,15 +185,13 @@ class MemStore {
long size = -1;
this.lock.readLock().lock();
try {
// Add anything as value as long as same instance each time.
size = heapSize(kv,
this.memstore.put(kv, NULL) == null);
size = heapSize(kv, this.kvset.add(kv));
} finally {
this.lock.readLock().unlock();
}
return size;
}
/**
* Write a delete
* @param delete
@ -221,7 +213,7 @@ class MemStore {
try {
boolean notpresent = false;
List<KeyValue> deletes = new ArrayList<KeyValue>();
SortedMap<KeyValue, Object> tail = this.memstore.tailMap(delete);
SortedSet<KeyValue> tail = this.kvset.tailSet(delete);
//Parse the delete, so that it is only done once
byte [] deleteBuffer = delete.getBuffer();
@ -251,28 +243,27 @@ class MemStore {
byte deleteType = deleteBuffer[deleteOffset];
//Comparing with tail from memstore
for (Map.Entry<KeyValue, ?> entry : tail.entrySet()) {
DeleteCode res = DeleteCompare.deleteCompare(entry.getKey(),
deleteBuffer,
for (KeyValue kv : tail) {
DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
deleteRowOffset, deleteRowLen, deleteQualifierOffset,
deleteQualifierLen, deleteTimestampOffset, deleteType,
comparator.getRawComparator());
if (res == DeleteCode.DONE) {
break;
} else if (res == DeleteCode.DELETE) {
deletes.add(entry.getKey());
deletes.add(kv);
} // SKIP
}
//Delete all the entries effected by the last added delete
for(KeyValue del : deletes) {
notpresent = this.memstore.remove(del) == null;
size -= heapSize(del, notpresent);
for (KeyValue kv : deletes) {
notpresent = this.kvset.remove(kv);
size -= heapSize(kv, notpresent);
}
// Adding the delete to memstore. Add any value, as long as
// same instance each time.
size += heapSize(delete, this.memstore.put(delete, NULL) == null);
size += heapSize(delete, this.kvset.add(delete));
} finally {
this.lock.readLock().unlock();
}
@ -302,8 +293,7 @@ class MemStore {
KeyValue getNextRow(final KeyValue kv) {
this.lock.readLock().lock();
try {
return getLowest(getNextRow(kv, this.memstore),
getNextRow(kv, this.snapshot));
return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
} finally {
this.lock.readLock().unlock();
}
@ -325,22 +315,22 @@ class MemStore {
}
/*
* @param kv Find row that follows this one. If null, return first.
* @param key Find row that follows this one. If null, return first.
* @param map Set to look in for a row beyond <code>row</code>.
* @return Next row or null if none found. If one found, will be a new
* KeyValue -- can be destroyed by subsequent calls to this method.
*/
private KeyValue getNextRow(final KeyValue kv,
final NavigableMap<KeyValue, ?> map) {
private KeyValue getNextRow(final KeyValue key,
final NavigableSet<KeyValue> set) {
KeyValue result = null;
SortedMap<KeyValue, ?> tail = kv == null? map: map.tailMap(kv);
SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
// Iterate until we fall into the next row; i.e. move off current row
for (Map.Entry<KeyValue, ?> i : tail.entrySet()) {
if (comparator.compareRows(i.getKey(), kv) <= 0)
for (KeyValue kv: tail) {
if (comparator.compareRows(kv, key) <= 0)
continue;
// Note: Not suppressing deletes or expired cells. Needs to be handled
// by higher up functions.
result = i.getKey();
result = kv;
break;
}
return result;
@ -372,17 +362,17 @@ class MemStore {
final NavigableSet<KeyValue> deletes, final long now) {
this.lock.readLock().lock();
try {
getRowKeyAtOrBefore(memstore, kv, candidates, deletes, now);
getRowKeyAtOrBefore(kvset, kv, candidates, deletes, now);
getRowKeyAtOrBefore(snapshot, kv, candidates, deletes, now);
} finally {
this.lock.readLock().unlock();
}
}
private void getRowKeyAtOrBefore(final ConcurrentSkipListMap<KeyValue, Object> map,
private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
final KeyValue kv, final NavigableSet<KeyValue> candidates,
final NavigableSet<KeyValue> deletes, final long now) {
if (map.isEmpty()) {
if (set.isEmpty()) {
return;
}
// We want the earliest possible to start searching from. Start before
@ -390,22 +380,21 @@ class MemStore {
KeyValue search = candidates.isEmpty()? kv: candidates.first();
// Get all the entries that come equal or after our search key
SortedMap<KeyValue, Object> tail = map.tailMap(search);
SortedSet<KeyValue> tail = set.tailSet(search);
// if there are items in the tail map, there's either a direct match to
// the search key, or a range of values between the first candidate key
// and the ultimate search key (or the end of the cache)
if (!tail.isEmpty() &&
this.comparator.compareRows(tail.firstKey(), search) <= 0) {
this.comparator.compareRows(tail.first(), search) <= 0) {
// Keep looking at cells as long as they are no greater than the
// ultimate search key and there's still records left in the map.
KeyValue deleted = null;
KeyValue found = null;
for (Iterator<Map.Entry<KeyValue, Object>> iterator =
tail.entrySet().iterator();
for (Iterator<KeyValue> iterator = tail.iterator();
iterator.hasNext() && (found == null ||
this.comparator.compareRows(found, kv) <= 0);) {
found = iterator.next().getKey();
found = iterator.next();
if (this.comparator.compareRows(found, kv) <= 0) {
if (found.isDeleteType()) {
Store.handleDeletes(found, candidates, deletes);
@ -427,12 +416,12 @@ class MemStore {
}
}
if (candidates.isEmpty() && deleted != null) {
getRowKeyBefore(map, deleted, candidates, deletes, now);
getRowKeyBefore(set, deleted, candidates, deletes, now);
}
} else {
// The tail didn't contain any keys that matched our criteria, or was
// empty. Examine all the keys that proceed our splitting point.
getRowKeyBefore(map, search, candidates, deletes, now);
getRowKeyBefore(set, search, candidates, deletes, now);
}
}
@ -440,19 +429,19 @@ class MemStore {
* Get row key that comes before passed <code>search_key</code>
* Use when we know search_key is not in the map and we need to search
* earlier in the cache.
* @param map
* @param set
* @param search
* @param candidates
* @param deletes Pass a Set that has a Comparator that ignores key type.
* @param now
*/
private void getRowKeyBefore(ConcurrentSkipListMap<KeyValue, Object> map,
private void getRowKeyBefore(NavigableSet<KeyValue> set,
KeyValue search, NavigableSet<KeyValue> candidates,
final NavigableSet<KeyValue> deletes, final long now) {
NavigableMap<KeyValue, Object> headMap = map.headMap(search);
NavigableSet<KeyValue> head = set.headSet(search, false);
// If we tried to create a headMap and got an empty map, then there are
// no keys at or before the search key, so we're done.
if (headMap.isEmpty()) {
if (head.isEmpty()) {
return;
}
@ -461,7 +450,7 @@ class MemStore {
if (candidates.isEmpty()) {
KeyValue lastFound = null;
// TODO: Confirm we're iterating in the right order
for (Iterator<KeyValue> i = headMap.descendingKeySet().iterator();
for (Iterator<KeyValue> i = head.descendingIterator();
i.hasNext();) {
KeyValue found = i.next();
// if the last row we found a candidate key for is different than
@ -481,14 +470,14 @@ class MemStore {
candidates.add(found);
} else {
// Its expired.
Store.expiredOrDeleted(map, found);
Store.expiredOrDeleted(set, found);
}
} else {
// We are encountering items in reverse. We may have just added
// an item to candidates that this later item deletes. Check. If we
// found something in candidates, remove it from the set.
if (Store.handleDeletes(found, candidates, deletes)) {
remove(map, found);
remove(set, found);
}
}
}
@ -497,11 +486,11 @@ class MemStore {
// the very last row's worth of keys in the headMap, because any
// smaller acceptable candidate keys would have caused us to start
// our search earlier in the list, and we wouldn't be searching here.
SortedMap<KeyValue, Object> rowTailMap =
headMap.tailMap(headMap.lastKey().cloneRow(HConstants.LATEST_TIMESTAMP));
Iterator<Map.Entry<KeyValue, Object>> i = rowTailMap.entrySet().iterator();
SortedSet<KeyValue> rowTail =
head.tailSet(head.last().cloneRow(HConstants.LATEST_TIMESTAMP));
Iterator<KeyValue> i = rowTail.iterator();
do {
KeyValue found = i.next().getKey();
KeyValue found = i.next();
if (found.isDeleteType()) {
Store.handleDeletes(found, candidates, deletes);
} else {
@ -510,7 +499,7 @@ class MemStore {
!deletes.contains(found)) {
candidates.add(found);
} else {
Store.expiredOrDeleted(map, found);
Store.expiredOrDeleted(set, found);
}
}
} while (i.hasNext());
@ -519,22 +508,22 @@ class MemStore {
/*
* @param map
* @param kv This is a delete record. Remove anything behind this of same
* @param set
* @param delete This is a delete record. Remove anything behind this of same
* r/c/ts.
* @return True if we removed anything.
*/
private boolean remove(final NavigableMap<KeyValue, Object> map,
final KeyValue kv) {
SortedMap<KeyValue, Object> m = map.tailMap(kv);
if (m.isEmpty()) {
private boolean remove(final NavigableSet<KeyValue> set,
final KeyValue delete) {
SortedSet<KeyValue> s = set.tailSet(delete);
if (s.isEmpty()) {
return false;
}
boolean removed = false;
for (Map.Entry<KeyValue, Object> entry: m.entrySet()) {
if (this.comparatorIgnoreType.compare(entry.getKey(), kv) == 0) {
for (KeyValue kv: s) {
if (this.comparatorIgnoreType.compare(kv, delete) == 0) {
// Same r/c/ts. Remove it.
m.remove(entry.getKey());
s.remove(kv);
removed = true;
continue;
}
@ -550,7 +539,7 @@ class MemStore {
this.lock.readLock().lock();
try {
KeyValueScanner [] scanners = new KeyValueScanner[2];
scanners[0] = new MemStoreScanner(this.memstore);
scanners[0] = new MemStoreScanner(this.kvset);
scanners[1] = new MemStoreScanner(this.snapshot);
return scanners;
} finally {
@ -579,7 +568,7 @@ class MemStore {
throws IOException {
this.lock.readLock().lock();
try {
if(internalGet(this.memstore, matcher, result) || matcher.isDone()) {
if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
return true;
}
matcher.update();
@ -591,23 +580,23 @@ class MemStore {
/**
*
* @param map memstore or snapshot
* @param set memstore or snapshot
* @param matcher query matcher
* @param result list to add results to
* @return true if done with store (early-out), false if not
* @throws IOException
*/
private boolean internalGet(SortedMap<KeyValue, Object> map, QueryMatcher matcher,
List<KeyValue> result)
private boolean internalGet(final NavigableSet<KeyValue> set,
final QueryMatcher matcher, final List<KeyValue> result)
throws IOException {
if(map.isEmpty()) return false;
if(set.isEmpty()) return false;
// Seek to startKey
SortedMap<KeyValue, Object> tail = map.tailMap(matcher.getStartKey());
for (Map.Entry<KeyValue, Object> entry : tail.entrySet()) {
QueryMatcher.MatchCode res = matcher.match(entry.getKey());
SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
for (KeyValue kv : tail) {
QueryMatcher.MatchCode res = matcher.match(kv);
switch(res) {
case INCLUDE:
result.add(entry.getKey());
result.add(kv);
break;
case SKIP:
break;
@ -630,13 +619,13 @@ class MemStore {
* in the passed memstore tree.
*/
protected class MemStoreScanner implements KeyValueScanner {
private final NavigableMap<KeyValue, Object> mc;
private final NavigableSet<KeyValue> kvs;
private KeyValue current = null;
private List<KeyValue> result = new ArrayList<KeyValue>();
private int idx = 0;
MemStoreScanner(final NavigableMap<KeyValue, Object> mc) {
this.mc = mc;
MemStoreScanner(final NavigableSet<KeyValue> s) {
this.kvs = s;
}
public boolean seek(KeyValue key) {
@ -678,9 +667,9 @@ class MemStore {
* next row.
*/
boolean cacheNextRow() {
SortedMap<KeyValue, Object> keys;
SortedSet<KeyValue> keys;
try {
keys = this.mc.tailMap(this.current);
keys = this.kvs.tailSet(this.current);
} catch (Exception e) {
close();
return false;
@ -690,9 +679,8 @@ class MemStore {
return false;
}
this.current = null;
byte [] row = keys.firstKey().getRow();
for (Map.Entry<KeyValue, Object> key: keys.entrySet()) {
KeyValue kv = key.getKey();
byte [] row = keys.first().getRow();
for (KeyValue kv: keys) {
if (comparator.compareRows(kv, row) != 0) {
this.current = kv;
break;

View File

@ -288,8 +288,8 @@ public class Store implements HConstants {
// general memory usage accounting.
long maxSeqIdInLog = -1;
// TODO: Move this memstoring over into MemStore.
ConcurrentSkipListMap<KeyValue, Object> reconstructedCache =
MemStore.createMap(this.comparator);
KeyValueSkipListSet reconstructedCache =
new KeyValueSkipListSet(this.comparator);
SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
reconstructionLog, this.conf);
try {
@ -315,7 +315,7 @@ public class Store implements HConstants {
continue;
}
// Add anything as value as long as we use same instance each time.
reconstructedCache.put(val, Boolean.TRUE);
reconstructedCache.add(val);
editsCount++;
// Every 2k edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
@ -466,17 +466,17 @@ public class Store implements HConstants {
boolean flushCache(final long logCacheFlushId) throws IOException {
// Get the snapshot to flush. Presumes that a call to
// this.memstore.snapshot() has happened earlier up in the chain.
ConcurrentSkipListMap<KeyValue, ?> cache = this.memstore.getSnapshot();
KeyValueSkipListSet snapshot = this.memstore.getSnapshot();
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
StoreFile sf = internalFlushCache(cache, logCacheFlushId);
StoreFile sf = internalFlushCache(snapshot, logCacheFlushId);
if (sf == null) {
return false;
}
// Add new file to store files. Clear snapshot too while we have the
// Store write lock.
int size = updateStorefiles(logCacheFlushId, sf, cache);
int size = updateStorefiles(logCacheFlushId, sf, snapshot);
return size >= this.compactionThreshold;
}
@ -486,13 +486,13 @@ public class Store implements HConstants {
* @return StoreFile created.
* @throws IOException
*/
private StoreFile internalFlushCache(final ConcurrentSkipListMap<KeyValue, ?> cache,
private StoreFile internalFlushCache(final KeyValueSkipListSet set,
final long logCacheFlushId)
throws IOException {
HFile.Writer writer = null;
long flushed = 0;
// Don't flush if there are no entries.
if (cache.size() == 0) {
if (set.size() == 0) {
return null;
}
long oldestTimestamp = System.currentTimeMillis() - ttl;
@ -504,8 +504,7 @@ public class Store implements HConstants {
writer = getWriter();
int entries = 0;
try {
for (Map.Entry<KeyValue, ?> entry: cache.entrySet()) {
KeyValue kv = entry.getKey();
for (KeyValue kv: set) {
if (!isExpired(kv, oldestTimestamp)) {
writer.append(kv);
entries++;
@ -554,12 +553,12 @@ public class Store implements HConstants {
* Change storefiles adding into place the Reader produced by this new flush.
* @param logCacheFlushId
* @param sf
* @param cache That was used to make the passed file <code>p</code>.
* @param set That was used to make the passed file <code>p</code>.
* @throws IOException
* @return Count of store files.
*/
private int updateStorefiles(final long logCacheFlushId,
final StoreFile sf, final NavigableMap<KeyValue, ?> cache)
final StoreFile sf, final KeyValueSkipListSet set)
throws IOException {
int count = 0;
this.lock.writeLock().lock();
@ -568,7 +567,7 @@ public class Store implements HConstants {
count = this.storefiles.size();
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
this.memstore.clearSnapshot(cache);
this.memstore.clearSnapshot(set);
return count;
} finally {
this.lock.writeLock().unlock();
@ -974,8 +973,8 @@ public class Store implements HConstants {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
static void expiredOrDeleted(final Map<KeyValue, Object> set, final KeyValue kv) {
boolean b = set.remove(kv) != null;
static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
boolean b = set.remove(kv);
if (LOG.isDebugEnabled()) {
LOG.debug(kv.toString() + " expired: " + b);
}

View File

@ -64,6 +64,8 @@ public class TestOldAPIGetRowVersions extends HBaseClusterTestCase {
BatchUpdate b = new BatchUpdate(ROW, TIMESTAMP);
b.put(COLUMN, Bytes.toBytes(VALUE1));
this.table.commit(b);
Cell c = this.table.get(ROW, COLUMN);
assertEquals(VALUE1, Bytes.toString(c.getValue()));
/* Taking out this recycle of the mini cluster -- it don't work well
* Debug it if fails in TestGetRowVersion, not this old api version.
// Shut down and restart the HBase cluster
@ -72,17 +74,18 @@ public class TestOldAPIGetRowVersions extends HBaseClusterTestCase {
LOG.debug("HBase cluster shut down -- restarting");
this.hBaseClusterSetup();
*/
// Make a new connection
this.table = new HTable(conf, TABLE_NAME);
// Overwrite previous value
b = new BatchUpdate(ROW, TIMESTAMP);
b.put(COLUMN, Bytes.toBytes(VALUE2));
this.table.commit(b);
c = this.table.get(ROW, COLUMN);
LOG.info("Got " + Bytes.toString(c.getValue()));
assertEquals(VALUE2, Bytes.toString(c.getValue()));
// Now verify that getRow(row, column, latest) works
RowResult r = table.getRow(ROW);
assertNotNull(r);
assertTrue(r.size() != 0);
Cell c = r.get(COLUMN);
c = r.get(COLUMN);
assertNotNull(c);
assertTrue(c.getValue().length != 0);
String value = Bytes.toString(c.getValue());

View File

@ -191,11 +191,11 @@ public class TestHRegion extends HBaseTestCase {
//checkAndPut with wrong value
Store store = region.getStore(fam1);
int size = store.memstore.memstore.size();
int size = store.memstore.kvset.size();
boolean res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
assertEquals(true, res);
size = store.memstore.memstore.size();
size = store.memstore.kvset.size();
Get get = new Get(row1);
get.addColumn(fam2, qf1);
@ -414,12 +414,10 @@ public class TestHRegion extends HBaseTestCase {
// extract the key values out the memstore:
// This is kinda hacky, but better than nothing...
long now = System.currentTimeMillis();
KeyValue firstKv = region.getStore(fam1).memstore.memstore.firstKey();
KeyValue firstKv = region.getStore(fam1).memstore.kvset.first();
assertTrue(firstKv.getTimestamp() <= now);
now = firstKv.getTimestamp();
for (Map.Entry<KeyValue, ?> entry:
region.getStore(fam1).memstore.memstore.entrySet()) {
KeyValue kv = entry.getKey();
for (KeyValue kv: region.getStore(fam1).memstore.kvset) {
assertTrue(kv.getTimestamp() <= now);
now = kv.getTimestamp();
}

View File

@ -0,0 +1,147 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import junit.framework.TestCase;
public class TestKeyValueSkipListSet extends TestCase {
private final KeyValueSkipListSet kvsls =
new KeyValueSkipListSet(KeyValue.COMPARATOR);
protected void setUp() throws Exception {
super.setUp();
this.kvsls.clear();
}
public void testAdd() throws Exception {
byte [] bytes = Bytes.toBytes(getName());
KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
this.kvsls.add(kv);
assertTrue(this.kvsls.contains(kv));
assertEquals(1, this.kvsls.size());
KeyValue first = this.kvsls.first();
assertTrue(kv.equals(first));
assertTrue(Bytes.equals(kv.getValue(), first.getValue()));
// Now try overwritting
byte [] overwriteValue = Bytes.toBytes("overwrite");
KeyValue overwrite = new KeyValue(bytes, bytes, bytes, overwriteValue);
this.kvsls.add(overwrite);
assertEquals(1, this.kvsls.size());
first = this.kvsls.first();
assertTrue(Bytes.equals(overwrite.getValue(), first.getValue()));
assertFalse(Bytes.equals(overwrite.getValue(), kv.getValue()));
}
public void testIterator() throws Exception {
byte [] bytes = Bytes.toBytes(getName());
byte [] value1 = Bytes.toBytes("1");
byte [] value2 = Bytes.toBytes("2");
final int total = 3;
for (int i = 0; i < total; i++) {
this.kvsls.add(new KeyValue(bytes, bytes, Bytes.toBytes("" + i), value1));
}
// Assert that we added 'total' values and that they are in order
int count = 0;
for (KeyValue kv: this.kvsls) {
assertEquals("" + count, Bytes.toString(kv.getQualifier()));
assertTrue(Bytes.equals(kv.getValue(), value1));
count++;
}
assertEquals(total, count);
// Now overwrite with a new value.
for (int i = 0; i < total; i++) {
this.kvsls.add(new KeyValue(bytes, bytes, Bytes.toBytes("" + i), value2));
}
// Assert that we added 'total' values and that they are in order and that
// we are getting back value2
count = 0;
for (KeyValue kv: this.kvsls) {
assertEquals("" + count, Bytes.toString(kv.getQualifier()));
assertTrue(Bytes.equals(kv.getValue(), value2));
count++;
}
assertEquals(total, count);
}
public void testDescendingIterator() throws Exception {
byte [] bytes = Bytes.toBytes(getName());
byte [] value1 = Bytes.toBytes("1");
byte [] value2 = Bytes.toBytes("2");
final int total = 3;
for (int i = 0; i < total; i++) {
this.kvsls.add(new KeyValue(bytes, bytes, Bytes.toBytes("" + i), value1));
}
// Assert that we added 'total' values and that they are in order
int count = 0;
for (Iterator<KeyValue> i = this.kvsls.descendingIterator(); i.hasNext();) {
KeyValue kv = i.next();
assertEquals("" + (total - (count + 1)), Bytes.toString(kv.getQualifier()));
assertTrue(Bytes.equals(kv.getValue(), value1));
count++;
}
assertEquals(total, count);
// Now overwrite with a new value.
for (int i = 0; i < total; i++) {
this.kvsls.add(new KeyValue(bytes, bytes, Bytes.toBytes("" + i), value2));
}
// Assert that we added 'total' values and that they are in order and that
// we are getting back value2
count = 0;
for (Iterator<KeyValue> i = this.kvsls.descendingIterator(); i.hasNext();) {
KeyValue kv = i.next();
assertEquals("" + (total - (count + 1)), Bytes.toString(kv.getQualifier()));
assertTrue(Bytes.equals(kv.getValue(), value2));
count++;
}
assertEquals(total, count);
}
public void testHeadTail() throws Exception {
byte [] bytes = Bytes.toBytes(getName());
byte [] value1 = Bytes.toBytes("1");
byte [] value2 = Bytes.toBytes("2");
final int total = 3;
KeyValue splitter = null;
for (int i = 0; i < total; i++) {
KeyValue kv = new KeyValue(bytes, bytes, Bytes.toBytes("" + i), value1);
if (i == 1) splitter = kv;
this.kvsls.add(kv);
}
SortedSet<KeyValue> tail = this.kvsls.tailSet(splitter);
assertEquals(2, tail.size());
SortedSet<KeyValue> head = this.kvsls.headSet(splitter);
assertEquals(1, head.size());
// Now ensure that we get back right answer even when we do tail or head.
// Now overwrite with a new value.
for (int i = 0; i < total; i++) {
this.kvsls.add(new KeyValue(bytes, bytes, Bytes.toBytes("" + i), value2));
}
tail = this.kvsls.tailSet(splitter);
assertTrue(Bytes.equals(tail.first().getValue(), value2));
head = this.kvsls.headSet(splitter);
assertTrue(Bytes.equals(head.first().getValue(), value2));
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
@ -54,6 +53,19 @@ public class TestMemStore extends TestCase {
this.memstore = new MemStore();
}
public void testPutSameKey() {
byte [] bytes = Bytes.toBytes(getName());
KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
this.memstore.add(kv);
byte [] other = Bytes.toBytes("somethingelse");
KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
this.memstore.add(samekey);
KeyValue found = this.memstore.kvset.first();
assertEquals(1, this.memstore.kvset.size());
assertTrue(Bytes.toString(found.getValue()), Bytes.equals(samekey.getValue(),
found.getValue()));
}
/**
* Test memstore snapshot happening while scanning.
* @throws IOException
@ -108,7 +120,7 @@ public class TestMemStore extends TestCase {
for (int i = 0; i < snapshotCount; i++) {
addRows(this.memstore);
runSnapshot(this.memstore);
Map<KeyValue, ?> ss = this.memstore.getSnapshot();
KeyValueSkipListSet ss = this.memstore.getSnapshot();
assertEquals("History not being cleared", 0, ss.size());
}
}
@ -130,7 +142,7 @@ public class TestMemStore extends TestCase {
m.add(key2);
assertTrue("Expected memstore to hold 3 values, actually has " +
m.memstore.size(), m.memstore.size() == 3);
m.kvset.size(), m.kvset.size() == 3);
}
public void testBinary() throws IOException {
@ -153,9 +165,9 @@ public class TestMemStore extends TestCase {
// System.out.println(key);
}
int index = start;
for (Map.Entry<KeyValue, ?> entry: mc.memstore.entrySet()) {
System.out.println(entry);
byte [] b = entry.getKey().getRow();
for (KeyValue kv: mc.kvset) {
System.out.println(kv);
byte [] b = kv.getRow();
// Hardcoded offsets into String
String str = Bytes.toString(b, 13, 4);
byte [] bb = Bytes.toBytes(index);
@ -359,10 +371,10 @@ public class TestMemStore extends TestCase {
memstore.snapshot();
assertEquals(3, memstore.snapshot.size());
//Adding value to "new" memstore
assertEquals(0, memstore.memstore.size());
assertEquals(0, memstore.kvset.size());
memstore.add(new KeyValue(row, fam ,qf4, val));
memstore.add(new KeyValue(row, fam ,qf5, val));
assertEquals(2, memstore.memstore.size());
assertEquals(2, memstore.kvset.size());
List<KeyValue> result = new ArrayList<KeyValue>();
boolean res = memstore.get(matcher, result);
@ -443,7 +455,7 @@ public class TestMemStore extends TestCase {
memstore.add(put2);
memstore.add(put3);
assertEquals(3, memstore.memstore.size());
assertEquals(3, memstore.kvset.size());
KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val);
memstore.delete(del2);
@ -453,10 +465,10 @@ public class TestMemStore extends TestCase {
expected.add(del2);
expected.add(put1);
assertEquals(3, memstore.memstore.size());
assertEquals(3, memstore.kvset.size());
int i = 0;
for(Map.Entry<KeyValue, ?> entry : memstore.memstore.entrySet()) {
assertEquals(expected.get(i++), entry.getKey());
for(KeyValue kv : memstore.kvset) {
assertEquals(expected.get(i++), kv);
}
}
@ -476,7 +488,7 @@ public class TestMemStore extends TestCase {
memstore.add(put2);
memstore.add(put3);
assertEquals(3, memstore.memstore.size());
assertEquals(3, memstore.kvset.size());
KeyValue del2 =
new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val);
@ -486,10 +498,10 @@ public class TestMemStore extends TestCase {
expected.add(put3);
expected.add(del2);
assertEquals(2, memstore.memstore.size());
assertEquals(2, memstore.kvset.size());
int i = 0;
for(Map.Entry<KeyValue, ?> entry : memstore.memstore.entrySet()) {
assertEquals(expected.get(i++), entry.getKey());
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
}
}
@ -521,10 +533,10 @@ public class TestMemStore extends TestCase {
expected.add(del);
expected.add(put4);
assertEquals(2, memstore.memstore.size());
assertEquals(2, memstore.kvset.size());
int i = 0;
for(Map.Entry<KeyValue, ?> entry : memstore.memstore.entrySet()) {
assertEquals(expected.get(i++), entry.getKey());
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
}
}
@ -537,8 +549,8 @@ public class TestMemStore extends TestCase {
memstore.add(new KeyValue(row, fam, qf, ts, val));
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memstore.delete(delete);
assertEquals(1, memstore.memstore.size());
assertEquals(delete, memstore.memstore.firstKey());
assertEquals(1, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteVersion() throws IOException {
@ -550,8 +562,8 @@ public class TestMemStore extends TestCase {
"row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
memstore.delete(delete);
assertEquals(1, memstore.memstore.size());
assertEquals(delete, memstore.memstore.firstKey());
assertEquals(1, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteColumn() throws IOException {
// add a put to memstore
@ -562,8 +574,8 @@ public class TestMemStore extends TestCase {
KeyValue.Type.DeleteColumn, "dont-care");
memstore.delete(delete);
assertEquals(1, memstore.memstore.size());
assertEquals(delete, memstore.memstore.firstKey());
assertEquals(1, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteFamily() throws IOException {
// add a put to memstore
@ -574,8 +586,8 @@ public class TestMemStore extends TestCase {
KeyValue.Type.DeleteFamily, "dont-care");
memstore.delete(delete);
assertEquals(1, memstore.memstore.size());
assertEquals(delete, memstore.memstore.firstKey());
assertEquals(1, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
@ -609,7 +621,7 @@ public class TestMemStore extends TestCase {
// Save off old state.
int oldHistorySize = hmc.getSnapshot().size();
hmc.snapshot();
Map<KeyValue, ?> ss = hmc.getSnapshot();
KeyValueSkipListSet ss = hmc.getSnapshot();
// Make some assertions about what just happened.
assertTrue("History size has not increased", oldHistorySize < ss.size());
hmc.clearSnapshot(ss);

View File

@ -181,7 +181,7 @@ public class TestStore extends TestCase {
this.store.snapshot();
this.store.flushCache(id++);
assertEquals(storeFilessize, this.store.getStorefiles().size());
assertEquals(0, this.store.memstore.memstore.size());
assertEquals(0, this.store.memstore.kvset.size());
}
private void assertCheck() {