diff --git a/CHANGES.txt b/CHANGES.txt
index 7cfca1dfbd5..348b31cb8bb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -186,6 +186,8 @@ Release 0.20.0 - Unreleased
HBASE-1523 NPE in BaseScanner
HBASE-1525 HTable.incrementColumnValue hangs()
HBASE-1526 mapreduce fixup
+ HBASE-1503 hbase-1304 dropped updating list of store files on flush
+ (jgray via stack)
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index b1a329f8def..daea26aa42e 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -38,16 +38,13 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator;
*
* In the Region case, we also need InternalScanner.next(List), so this class
* also implements InternalScanner. WARNING: As is, if you try to use this
- * as an InternalScanner at the Store level, you will get runtime exceptions.
+ * as an InternalScanner at the Store level, you will get runtime exceptions.
*/
public class KeyValueHeap implements KeyValueScanner, InternalScanner {
-
private PriorityQueue heap;
-
private KeyValueScanner current = null;
-
private KVScannerComparator comparator;
-
+
/**
* Constructor
* @param scanners
@@ -91,7 +88,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
}
return kvReturn;
}
-
+
/**
* Gets the next row of keys from the top-most scanner.
*
@@ -194,4 +191,4 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
public PriorityQueue getHeap() {
return this.heap;
}
-}
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
index 8441d641a68..98456079162 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
@@ -539,12 +539,18 @@ class Memcache {
}
/**
- * @return a scanner over the keys in the Memcache
+ * @return scanner on memcache and snapshot in this order (if snapshot is
+ * empty, returns only memcache scanner).
*/
- KeyValueScanner getScanner() {
+ KeyValueScanner [] getScanners() {
this.lock.readLock().lock();
try {
- return new MemcacheScanner();
+ boolean noss = this.snapshot == null || this.snapshot.isEmpty();
+ KeyValueScanner [] scanners =
+ new KeyValueScanner[noss? 1: 2];
+ scanners[0] = new MemcacheScanner(this.memcache);
+ if (!noss) scanners[1] = new MemcacheScanner(this.snapshot);
+ return scanners;
} finally {
this.lock.readLock().unlock();
}
@@ -553,7 +559,7 @@ class Memcache {
//
// HBASE-880/1249/1304
//
-
+
/**
* Perform a single-row Get on the memcache and snapshot, placing results
* into the specified KV list.
@@ -618,71 +624,78 @@ class Memcache {
return false;
}
- //////////////////////////////////////////////////////////////////////////////
- // MemcacheScanner implements the KeyValueScanner.
- // It lets the caller scan the contents of the Memcache.
- // This behaves as if it were a real scanner but does not maintain position
- // in the Memcache tree.
- //////////////////////////////////////////////////////////////////////////////
+ /*
+ * MemcacheScanner implements the KeyValueScanner.
+ * It lets the caller scan the contents of a memcache.
+ * This behaves as if it were a real scanner but does not maintain position
+ * in the passed memcache tree.
+ */
protected class MemcacheScanner implements KeyValueScanner {
+ private final NavigableSet mc;
private KeyValue current = null;
private List result = new ArrayList();
private int idx = 0;
-
- MemcacheScanner() {}
-
+
+ MemcacheScanner(final NavigableSet mc) {
+ this.mc = mc;
+ }
+
public boolean seek(KeyValue key) {
try {
- if(key == null) {
+ if (key == null) {
close();
return false;
}
- current = key;
+ this.current = key;
return cacheNextRow();
} catch(Exception e) {
close();
return false;
}
}
-
+
public KeyValue peek() {
- if(idx >= result.size()) {
- if(!cacheNextRow()) {
+ if (idx >= this.result.size()) {
+ if (!cacheNextRow()) {
return null;
}
return peek();
}
return result.get(idx);
}
-
+
public KeyValue next() {
- if(idx >= result.size()) {
- if(!cacheNextRow()) {
+ if (idx >= result.size()) {
+ if (!cacheNextRow()) {
return null;
}
return next();
}
- return result.get(idx++);
+ return this.result.get(idx++);
}
-
+
+ /**
+ * @return True if we successfully cached a NavigableSet aligned on
+ * next row.
+ */
boolean cacheNextRow() {
- NavigableSet keys;
+ SortedSet keys;
try {
- keys = memcache.tailSet(current);
- } catch(Exception e) {
+ keys = this.mc.tailSet(this.current);
+ } catch (Exception e) {
close();
return false;
}
- if(keys == null || keys.isEmpty()) {
+ if (keys == null || keys.isEmpty()) {
close();
return false;
}
- current = null;
+ this.current = null;
byte [] row = keys.first().getRow();
- for(KeyValue key : keys) {
- if(comparator.compareRows(key, row) != 0) {
- current = key;
+ for (KeyValue key: keys) {
+ if (comparator.compareRows(key, row) != 0) {
+ this.current = key;
break;
}
result.add(key);
@@ -693,7 +706,7 @@ class Memcache {
public void close() {
current = null;
idx = 0;
- if(!result.isEmpty()) {
+ if (!result.isEmpty()) {
result.clear();
}
}
@@ -741,4 +754,4 @@ class Memcache {
}
LOG.info("Exiting.");
}
-}
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index a085ce80ba5..6ac255773c0 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,19 +37,12 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
* Scanner scans both the memcache and the HStore. Coaleace KeyValue stream
* into List for a single row.
*/
-class StoreScanner implements KeyValueScanner, InternalScanner,
-ChangedReadersObserver {
+class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
-
private Store store;
-
private ScanQueryMatcher matcher;
-
private KeyValueHeap heap;
- // Used around transition from no storefile to the first.
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
// Used to indicate that the scanner has closed (see HBASE-1107)
private final AtomicBoolean closing = new AtomicBoolean(false);
@@ -63,8 +55,7 @@ ChangedReadersObserver {
columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));
- List scanners = getStoreFileScanners();
- scanners.add(store.memcache.getScanner());
+ List scanners = getScanners();
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@@ -96,7 +87,19 @@ ChangedReadersObserver {
scanners, comparator);
}
- public KeyValue peek() {
+ /*
+ * @return List of scanners ordered properly.
+ */
+ private List getScanners() {
+ List scanners = getStoreFileScanners();
+ KeyValueScanner [] memcachescanners = this.store.memcache.getScanners();
+ for (int i = memcachescanners.length - 1; i >= 0; i--) {
+ scanners.add(memcachescanners[i]);
+ }
+ return scanners;
+ }
+
+ public synchronized KeyValue peek() {
return this.heap.peek();
}
@@ -105,7 +108,7 @@ ChangedReadersObserver {
throw new RuntimeException("Never call StoreScanner.next()");
}
- public void close() {
+ public synchronized void close() {
this.closing.set(true);
// under test, we dont have a this.store
if (this.store != null)
@@ -113,8 +116,7 @@ ChangedReadersObserver {
this.heap.close();
}
- public boolean seek(KeyValue key) {
-
+ public synchronized boolean seek(KeyValue key) {
return this.heap.seek(key);
}
@@ -123,7 +125,7 @@ ChangedReadersObserver {
* @param result
* @return true if there are more rows, false if scanner is done
*/
- public boolean next(List result) throws IOException {
+ public synchronized boolean next(List result) throws IOException {
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
@@ -138,6 +140,7 @@ ChangedReadersObserver {
KeyValue next = this.heap.next();
result.add(next);
continue;
+
case DONE:
// what happens if we have 0 results?
if (result.isEmpty()) {
@@ -159,14 +162,6 @@ ChangedReadersObserver {
return false;
case SEEK_NEXT_ROW:
- // TODO see comments in SEEK_NEXT_COL
- /*
- KeyValue rowToSeek =
- new KeyValue(kv.getRow(),
- 0,
- KeyValue.Type.Minimum);
- heap.seek(rowToSeek);
- */
heap.next();
break;
@@ -174,45 +169,6 @@ ChangedReadersObserver {
// TODO hfile needs 'hinted' seeking to prevent it from
// reseeking from the start of the block on every dang seek.
// We need that API and expose it the scanner chain.
- /*
- ColumnCount hint = matcher.getSeekColumn();
- KeyValue colToSeek;
- if (hint == null) {
- // seek to the 'last' key on this column, this is defined
- // as the key with the same row, fam, qualifier,
- // smallest timestamp, largest type.
- colToSeek =
- new KeyValue(kv.getRow(),
- kv.getFamily(),
- kv.getColumn(),
- Long.MIN_VALUE,
- KeyValue.Type.Minimum);
- } else {
- // This is ugmo. Move into KeyValue convience method.
- // First key on a column is:
- // same row, cf, qualifier, max_timestamp, max_type, no value.
- colToSeek =
- new KeyValue(kv.getRow(),
- 0,
- kv.getRow().length,
-
- kv.getFamily(),
- 0,
- kv.getFamily().length,
-
- hint.getBuffer(),
- hint.getOffset(),
- hint.getLength(),
-
- Long.MAX_VALUE,
- KeyValue.Type.Maximum,
- null,
- 0,
- 0);
- }
- heap.seek(colToSeek);
- */
-
heap.next();
break;
@@ -245,18 +201,24 @@ ChangedReadersObserver {
}
// Implementation of ChangedReadersObserver
- public void updateReaders() throws IOException {
- if (this.closing.get()) {
- return;
- }
- this.lock.writeLock().lock();
- try {
- // Could do this pretty nicely with KeyValueHeap, but the existing
- // implementation of this method only updated if no existing storefiles?
- // Lets discuss.
- return;
- } finally {
- this.lock.writeLock().unlock();
+ public synchronized void updateReaders() throws IOException {
+ if (this.closing.get()) return;
+ KeyValue topKey = this.peek();
+ if (topKey == null) return;
+
+ List scanners = getScanners();
+
+ // Seek all scanners to the initial key
+ for(KeyValueScanner scanner : scanners) {
+ scanner.seek(topKey);
}
+
+ // Combine all seeked scanners with a heap
+ heap = new KeyValueHeap(
+ scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);
+
+ // Reset the state of the Query Matcher and set to top row
+ matcher.reset();
+ matcher.setRow(heap.peek().getRow());
}
}
\ No newline at end of file
diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java b/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java
index ed018da9395..04ed019b571 100644
--- a/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java
+++ b/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java
@@ -218,7 +218,7 @@ public class TestMemcache extends TestCase {
InternalScanner scanner =
new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY,
Integer.MAX_VALUE, this.memcache.comparator, null,
- new KeyValueScanner[]{memcache.getScanner()});
+ new KeyValueScanner[]{memcache.getScanners()[0]});
List results = new ArrayList();
for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i;