diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 747a90b783d..6727d850f07 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -673,6 +673,10 @@ public class MemStore implements HeapSize { private Iterator kvsetIt; private Iterator snapshotIt; + // The kvset and snapshot at the time of creating this scanner + volatile KeyValueSkipListSet kvsetAtCreation; + volatile KeyValueSkipListSet snapshotAtCreation; + // Sub lists on which we're iterating private SortedSet kvTail; private SortedSet snapshotTail; @@ -703,6 +707,9 @@ public class MemStore implements HeapSize { MemStoreScanner() { super(); + + kvsetAtCreation = kvset; + snapshotAtCreation = snapshot; } protected KeyValue getNext(Iterator it) { @@ -734,8 +741,8 @@ public class MemStore implements HeapSize { // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - kvTail = kvset.tailSet(key); - snapshotTail = snapshot.tailSet(key); + kvTail = kvsetAtCreation.tailSet(key); + snapshotTail = snapshotAtCreation.tailSet(key); return seekInSubLists(key); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index d8755f5bfbf..96b4b4e8b0f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -424,10 +424,15 @@ public class Store extends SchemaConfigured implements HeapSize { ArrayList newFiles = new ArrayList(storefiles); newFiles.add(sf); this.storefiles = sortAndClone(newFiles); - notifyChangedReadersObservers(); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + notifyChangedReadersObservers(); LOG.info("Successfully loaded store file " + srcPath + " into store " + this + " (new location: " + dstPath + ")"); } @@ -671,15 +676,21 @@ public class Store extends SchemaConfigured implements HeapSize { ArrayList newList = new ArrayList(storefiles); newList.add(sf); storefiles = sortAndClone(newList); + this.memstore.clearSnapshot(set); - - // Tell listeners of the change in readers. - notifyChangedReadersObservers(); - - return needsCompaction(); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + + // Tell listeners of the change in readers. + notifyChangedReadersObservers(); + + return needsCompaction(); } /* @@ -692,6 +703,35 @@ public class Store extends SchemaConfigured implements HeapSize { } } + protected List getScanners(boolean cacheBlocks, + boolean isGet, + boolean isCompaction, + ScanQueryMatcher matcher) throws IOException { + List storeFiles; + List memStoreScanners; + this.lock.readLock().lock(); + try { + storeFiles = this.getStorefiles(); + memStoreScanners = this.memstore.getScanners(); + } finally { + this.lock.readLock().unlock(); + } + + // First the store file scanners + + // TODO this used to get the store files in descending order, + // but now we get them in ascending order, which I think is + // actually more correct, since memstore get put at the end. + List sfScanners = StoreFileScanner + .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher); + List scanners = + new ArrayList(sfScanners.size()+1); + scanners.addAll(sfScanners); + // Then the memstore scanners + scanners.addAll(memStoreScanners); + return scanners; + } + /* * @param o Observer who wants to know about changes in set of Readers */ @@ -1381,8 +1421,8 @@ public class Store extends SchemaConfigured implements HeapSize { this.family.getBloomFilterType()); result.createReader(); } - this.lock.writeLock().lock(); try { + this.lock.writeLock().lock(); try { // Change this.storefiles so it reflects new state but do not // delete old store files until we have sent out notification of @@ -1398,34 +1438,40 @@ public class Store extends SchemaConfigured implements HeapSize { } this.storefiles = sortAndClone(newStoreFiles); + } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. + this.lock.writeLock().unlock(); + } - // Tell observers that list of StoreFiles has changed. - notifyChangedReadersObservers(); - // Finally, delete old store files. - for (StoreFile hsf: compactedFiles) { - hsf.deleteReader(); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files in " + this.storeNameStr + - ". Compacted file is " + (result == null? "none": result.toString()) + - ". Files replaced " + compactedFiles.toString() + - " some of which may have been already removed", e); + // Tell observers that list of StoreFiles has changed. + notifyChangedReadersObservers(); + // Finally, delete old store files. + for (StoreFile hsf: compactedFiles) { + hsf.deleteReader(); } - // 4. Compute new store size - this.storeSize = 0L; - this.totalUncompressedBytes = 0L; - for (StoreFile hsf : this.storefiles) { - StoreFile.Reader r = hsf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + hsf + " has a null Reader"); - continue; - } - this.storeSize += r.length(); - this.totalUncompressedBytes += r.getTotalUncompressedBytes(); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files in " + this.storeNameStr + + ". Compacted file is " + (result == null? "none": result.toString()) + + ". Files replaced " + compactedFiles.toString() + + " some of which may have been already removed", e); + } + + // 4. Compute new store size + this.storeSize = 0L; + this.totalUncompressedBytes = 0L; + for (StoreFile hsf : this.storefiles) { + StoreFile.Reader r = hsf.getReader(); + if (r == null) { + LOG.warn("StoreFile " + hsf + " has a null Reader"); + continue; } - } finally { - this.lock.writeLock().unlock(); + this.storeSize += r.length(); + this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } return result; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index d1e8a2c102b..80709f750ef 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -172,20 +172,7 @@ class StoreScanner extends NonLazyKeyValueScanner * @return List of scanners ordered properly. */ private List getScanners() throws IOException { - // First the store file scanners - - // TODO this used to get the store files in descending order, - // but now we get them in ascending order, which I think is - // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet, - false); - List scanners = - new ArrayList(sfScanners.size()+1); - scanners.addAll(sfScanners); - // Then the memstore scanners - scanners.addAll(this.store.memstore.getScanners()); - return scanners; + return this.store.getScanners(cacheBlocks, isGet, false, null); } /* @@ -203,25 +190,25 @@ class StoreScanner extends NonLazyKeyValueScanner memOnly = false; filesOnly = false; } - List scanners = new LinkedList(); - // First the store file scanners - if (memOnly == false) { - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, - isGet, false, this.matcher); + List allStoreScanners = + this.store.getScanners(cacheBlocks, isGet, false, this.matcher); - // include only those scan files which pass all filters - for (StoreFileScanner sfs : sfScanners) { - if (sfs.shouldSeek(scan, columns)) { - scanners.add(sfs); - } + List scanners = + new ArrayList(allStoreScanners.size()); + + // include only those scan files which pass all filters + for (KeyValueScanner kvs : allStoreScanners) { + if (kvs instanceof StoreFileScanner) { + if (memOnly == false && ((StoreFileScanner)kvs).shouldSeek(scan, columns)) + scanners.add(kvs); + } + else { + // kvs is a MemStoreScanner + if (filesOnly == false && this.store.memstore.shouldSeek(scan)) + scanners.add(kvs); } } - // Then the memstore scanners - if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) { - scanners.addAll(this.store.memstore.getScanners()); - } return scanners; }