HBASE-1503 hbase-1304 dropped updating list of store files on flush

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@785009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-06-15 22:23:04 +00:00
parent e8d66ea285
commit aed9c07cd8
5 changed files with 90 additions and 116 deletions

View File

@ -186,6 +186,8 @@ Release 0.20.0 - Unreleased
HBASE-1523 NPE in BaseScanner HBASE-1523 NPE in BaseScanner
HBASE-1525 HTable.incrementColumnValue hangs() HBASE-1525 HTable.incrementColumnValue hangs()
HBASE-1526 mapreduce fixup HBASE-1526 mapreduce fixup
HBASE-1503 hbase-1304 dropped updating list of store files on flush
(jgray via stack)
IMPROVEMENTS IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -41,11 +41,8 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator;
* as an InternalScanner at the Store level, you will get runtime exceptions. * as an InternalScanner at the Store level, you will get runtime exceptions.
*/ */
public class KeyValueHeap implements KeyValueScanner, InternalScanner { public class KeyValueHeap implements KeyValueScanner, InternalScanner {
private PriorityQueue<KeyValueScanner> heap; private PriorityQueue<KeyValueScanner> heap;
private KeyValueScanner current = null; private KeyValueScanner current = null;
private KVScannerComparator comparator; private KVScannerComparator comparator;
/** /**

View File

@ -539,12 +539,18 @@ class Memcache {
} }
/** /**
* @return a scanner over the keys in the Memcache * @return scanner on memcache and snapshot in this order (if snapshot is
* empty, returns only memcache scanner).
*/ */
KeyValueScanner getScanner() { KeyValueScanner [] getScanners() {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
return new MemcacheScanner(); boolean noss = this.snapshot == null || this.snapshot.isEmpty();
KeyValueScanner [] scanners =
new KeyValueScanner[noss? 1: 2];
scanners[0] = new MemcacheScanner(this.memcache);
if (!noss) scanners[1] = new MemcacheScanner(this.snapshot);
return scanners;
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
@ -618,27 +624,30 @@ class Memcache {
return false; return false;
} }
//////////////////////////////////////////////////////////////////////////////
// MemcacheScanner implements the KeyValueScanner.
// It lets the caller scan the contents of the Memcache.
// This behaves as if it were a real scanner but does not maintain position
// in the Memcache tree.
//////////////////////////////////////////////////////////////////////////////
/*
* MemcacheScanner implements the KeyValueScanner.
* It lets the caller scan the contents of a memcache.
* This behaves as if it were a real scanner but does not maintain position
* in the passed memcache tree.
*/
protected class MemcacheScanner implements KeyValueScanner { protected class MemcacheScanner implements KeyValueScanner {
private final NavigableSet<KeyValue> mc;
private KeyValue current = null; private KeyValue current = null;
private List<KeyValue> result = new ArrayList<KeyValue>(); private List<KeyValue> result = new ArrayList<KeyValue>();
private int idx = 0; private int idx = 0;
MemcacheScanner() {} MemcacheScanner(final NavigableSet<KeyValue> mc) {
this.mc = mc;
}
public boolean seek(KeyValue key) { public boolean seek(KeyValue key) {
try { try {
if(key == null) { if (key == null) {
close(); close();
return false; return false;
} }
current = key; this.current = key;
return cacheNextRow(); return cacheNextRow();
} catch(Exception e) { } catch(Exception e) {
close(); close();
@ -647,8 +656,8 @@ class Memcache {
} }
public KeyValue peek() { public KeyValue peek() {
if(idx >= result.size()) { if (idx >= this.result.size()) {
if(!cacheNextRow()) { if (!cacheNextRow()) {
return null; return null;
} }
return peek(); return peek();
@ -657,32 +666,36 @@ class Memcache {
} }
public KeyValue next() { public KeyValue next() {
if(idx >= result.size()) { if (idx >= result.size()) {
if(!cacheNextRow()) { if (!cacheNextRow()) {
return null; return null;
} }
return next(); return next();
} }
return result.get(idx++); return this.result.get(idx++);
} }
/**
* @return True if we successfully cached a NavigableSet aligned on
* next row.
*/
boolean cacheNextRow() { boolean cacheNextRow() {
NavigableSet<KeyValue> keys; SortedSet<KeyValue> keys;
try { try {
keys = memcache.tailSet(current); keys = this.mc.tailSet(this.current);
} catch(Exception e) { } catch (Exception e) {
close(); close();
return false; return false;
} }
if(keys == null || keys.isEmpty()) { if (keys == null || keys.isEmpty()) {
close(); close();
return false; return false;
} }
current = null; this.current = null;
byte [] row = keys.first().getRow(); byte [] row = keys.first().getRow();
for(KeyValue key : keys) { for (KeyValue key: keys) {
if(comparator.compareRows(key, row) != 0) { if (comparator.compareRows(key, row) != 0) {
current = key; this.current = key;
break; break;
} }
result.add(key); result.add(key);
@ -693,7 +706,7 @@ class Memcache {
public void close() { public void close() {
current = null; current = null;
idx = 0; idx = 0;
if(!result.isEmpty()) { if (!result.isEmpty()) {
result.clear(); result.clear();
} }
} }

View File

@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,19 +37,12 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
* Scanner scans both the memcache and the HStore. Coaleace KeyValue stream * Scanner scans both the memcache and the HStore. Coaleace KeyValue stream
* into List<KeyValue> for a single row. * into List<KeyValue> for a single row.
*/ */
class StoreScanner implements KeyValueScanner, InternalScanner, class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class); static final Log LOG = LogFactory.getLog(StoreScanner.class);
private Store store; private Store store;
private ScanQueryMatcher matcher; private ScanQueryMatcher matcher;
private KeyValueHeap heap; private KeyValueHeap heap;
// Used around transition from no storefile to the first.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Used to indicate that the scanner has closed (see HBASE-1107) // Used to indicate that the scanner has closed (see HBASE-1107)
private final AtomicBoolean closing = new AtomicBoolean(false); private final AtomicBoolean closing = new AtomicBoolean(false);
@ -63,8 +55,7 @@ ChangedReadersObserver {
columns, store.ttl, store.comparator.getRawComparator(), columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions())); store.versionsToReturn(scan.getMaxVersions()));
List<KeyValueScanner> scanners = getStoreFileScanners(); List<KeyValueScanner> scanners = getScanners();
scanners.add(store.memcache.getScanner());
// Seek all scanners to the initial key // Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) { for(KeyValueScanner scanner : scanners) {
@ -96,7 +87,19 @@ ChangedReadersObserver {
scanners, comparator); scanners, comparator);
} }
public KeyValue peek() { /*
* @return List of scanners ordered properly.
*/
private List<KeyValueScanner> getScanners() {
List<KeyValueScanner> scanners = getStoreFileScanners();
KeyValueScanner [] memcachescanners = this.store.memcache.getScanners();
for (int i = memcachescanners.length - 1; i >= 0; i--) {
scanners.add(memcachescanners[i]);
}
return scanners;
}
public synchronized KeyValue peek() {
return this.heap.peek(); return this.heap.peek();
} }
@ -105,7 +108,7 @@ ChangedReadersObserver {
throw new RuntimeException("Never call StoreScanner.next()"); throw new RuntimeException("Never call StoreScanner.next()");
} }
public void close() { public synchronized void close() {
this.closing.set(true); this.closing.set(true);
// under test, we dont have a this.store // under test, we dont have a this.store
if (this.store != null) if (this.store != null)
@ -113,8 +116,7 @@ ChangedReadersObserver {
this.heap.close(); this.heap.close();
} }
public boolean seek(KeyValue key) { public synchronized boolean seek(KeyValue key) {
return this.heap.seek(key); return this.heap.seek(key);
} }
@ -123,7 +125,7 @@ ChangedReadersObserver {
* @param result * @param result
* @return true if there are more rows, false if scanner is done * @return true if there are more rows, false if scanner is done
*/ */
public boolean next(List<KeyValue> result) throws IOException { public synchronized boolean next(List<KeyValue> result) throws IOException {
KeyValue peeked = this.heap.peek(); KeyValue peeked = this.heap.peek();
if (peeked == null) { if (peeked == null) {
close(); close();
@ -138,6 +140,7 @@ ChangedReadersObserver {
KeyValue next = this.heap.next(); KeyValue next = this.heap.next();
result.add(next); result.add(next);
continue; continue;
case DONE: case DONE:
// what happens if we have 0 results? // what happens if we have 0 results?
if (result.isEmpty()) { if (result.isEmpty()) {
@ -159,14 +162,6 @@ ChangedReadersObserver {
return false; return false;
case SEEK_NEXT_ROW: case SEEK_NEXT_ROW:
// TODO see comments in SEEK_NEXT_COL
/*
KeyValue rowToSeek =
new KeyValue(kv.getRow(),
0,
KeyValue.Type.Minimum);
heap.seek(rowToSeek);
*/
heap.next(); heap.next();
break; break;
@ -174,45 +169,6 @@ ChangedReadersObserver {
// TODO hfile needs 'hinted' seeking to prevent it from // TODO hfile needs 'hinted' seeking to prevent it from
// reseeking from the start of the block on every dang seek. // reseeking from the start of the block on every dang seek.
// We need that API and expose it the scanner chain. // We need that API and expose it the scanner chain.
/*
ColumnCount hint = matcher.getSeekColumn();
KeyValue colToSeek;
if (hint == null) {
// seek to the 'last' key on this column, this is defined
// as the key with the same row, fam, qualifier,
// smallest timestamp, largest type.
colToSeek =
new KeyValue(kv.getRow(),
kv.getFamily(),
kv.getColumn(),
Long.MIN_VALUE,
KeyValue.Type.Minimum);
} else {
// This is ugmo. Move into KeyValue convience method.
// First key on a column is:
// same row, cf, qualifier, max_timestamp, max_type, no value.
colToSeek =
new KeyValue(kv.getRow(),
0,
kv.getRow().length,
kv.getFamily(),
0,
kv.getFamily().length,
hint.getBuffer(),
hint.getOffset(),
hint.getLength(),
Long.MAX_VALUE,
KeyValue.Type.Maximum,
null,
0,
0);
}
heap.seek(colToSeek);
*/
heap.next(); heap.next();
break; break;
@ -245,18 +201,24 @@ ChangedReadersObserver {
} }
// Implementation of ChangedReadersObserver // Implementation of ChangedReadersObserver
public void updateReaders() throws IOException { public synchronized void updateReaders() throws IOException {
if (this.closing.get()) { if (this.closing.get()) return;
return; KeyValue topKey = this.peek();
} if (topKey == null) return;
this.lock.writeLock().lock();
try { List<KeyValueScanner> scanners = getScanners();
// Could do this pretty nicely with KeyValueHeap, but the existing
// implementation of this method only updated if no existing storefiles? // Seek all scanners to the initial key
// Lets discuss. for(KeyValueScanner scanner : scanners) {
return; scanner.seek(topKey);
} finally {
this.lock.writeLock().unlock();
} }
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(
scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);
// Reset the state of the Query Matcher and set to top row
matcher.reset();
matcher.setRow(heap.peek().getRow());
} }
} }

View File

@ -218,7 +218,7 @@ public class TestMemcache extends TestCase {
InternalScanner scanner = InternalScanner scanner =
new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY, new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY,
Integer.MAX_VALUE, this.memcache.comparator, null, Integer.MAX_VALUE, this.memcache.comparator, null,
new KeyValueScanner[]{memcache.getScanner()}); new KeyValueScanner[]{memcache.getScanners()[0]});
List<KeyValue> results = new ArrayList<KeyValue>(); List<KeyValue> results = new ArrayList<KeyValue>();
for (int i = 0; scanner.next(results); i++) { for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i; int rowId = startRowId + i;