From fef6d7f48c81d63b12be4f53031bdbf208635cac Mon Sep 17 00:00:00 2001 From: anoopsjohn Date: Sat, 6 Jun 2015 08:02:35 +0530 Subject: [PATCH] HBASE-13827 Delayed scanner close in KeyValueHeap and StoreScanner. --- .../hadoop/hbase/io/HalfStoreFileReader.java | 5 ++ .../hbase/io/hfile/HFileReaderImpl.java | 5 ++ .../hadoop/hbase/io/hfile/HFileScanner.java | 5 ++ .../hbase/regionserver/KeyValueHeap.java | 22 ++++++--- .../regionserver/ReversedKeyValueHeap.java | 6 +-- .../hbase/regionserver/StoreFileScanner.java | 18 +++---- .../hbase/regionserver/StoreScanner.java | 49 +++++++++++++------ .../hbase/regionserver/TestKeyValueHeap.java | 21 ++++++-- 8 files changed, 88 insertions(+), 43 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index a95da7b0845..78c67343658 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -288,6 +288,11 @@ public class HalfStoreFileReader extends StoreFile.Reader { public Cell getNextIndexedKey() { return null; } + + @Override + public void close() { + this.delegate.close(); + } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4d1881db3b2..d184d42c826 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1077,6 +1077,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen)); } + + @Override + public void close() { + // HBASE-12295 will add code here. + } } public Path getPath() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index 2b6e011e072..6b527f6ef3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -149,4 +149,9 @@ public interface HFileScanner { * @return the next key in the index (the key to seek to the next block) */ Cell getNextIndexedKey(); + + /** + * Close this HFile scanner and do necessary cleanup. + */ + void close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 9220d074891..a12e7c351eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; +import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -59,7 +61,9 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner protected KeyValueScanner current = null; protected KVScannerComparator comparator; - + + protected Set scannersForDelayedClose = new HashSet(); + /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. @@ -87,7 +91,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner if (scanner.peek() != null) { this.heap.add(scanner); } else { - scanner.close(); + this.scannersForDelayedClose.add(scanner); } } this.current = pollRealKV(); @@ -108,7 +112,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner Cell kvReturn = this.current.next(); Cell kvNext = this.current.peek(); if (kvNext == null) { - this.current.close(); + this.scannersForDelayedClose.add(this.current); this.current = pollRealKV(); } else { KeyValueScanner topScanner = this.heap.peek(); @@ -154,7 +158,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner */ if (pee == null || !moreCells) { - this.current.close(); + this.scannersForDelayedClose.add(this.current); } else { this.heap.add(this.current); } @@ -210,6 +214,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } public void close() { + for (KeyValueScanner scanner : this.scannersForDelayedClose) { + scanner.close(); + } + this.scannersForDelayedClose.clear(); if (this.current != null) { this.current.close(); } @@ -311,7 +319,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } if (!seekResult) { - scanner.close(); + this.scannersForDelayedClose.add(scanner); } else { heap.add(scanner); } @@ -364,12 +372,12 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } else { // Close the scanner because we did a real seek and found out there // are no more KVs. - kvScanner.close(); + this.scannersForDelayedClose.add(kvScanner); } } else { // Close the scanner because it has already run out of KVs even before // we had to do a real seek on it. - kvScanner.close(); + this.scannersForDelayedClose.add(kvScanner); } kvScanner = heap.poll(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java index 5167b4e3f15..69141327670 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java @@ -85,7 +85,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap { } if (!scanner.seekToPreviousRow(seekKey)) { - scanner.close(); + this.scannersForDelayedClose.add(scanner); } else { heap.add(scanner); } @@ -114,7 +114,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap { return current != null; } if (!scanner.backwardSeek(seekKey)) { - scanner.close(); + this.scannersForDelayedClose.add(scanner); } else { heap.add(scanner); } @@ -134,7 +134,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap { if (this.current.seekToPreviousRow(kvReturn)) { this.heap.add(this.current); } else { - this.current.close(); + this.scannersForDelayedClose.add(this.current); } this.current = pollRealKV(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 42a378da10d..e7a5af43cc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -27,8 +27,6 @@ import java.util.List; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; */ @InterfaceAudience.LimitedPrivate("Coprocessor") public class StoreFileScanner implements KeyValueScanner { - private static final Log LOG = LogFactory.getLog(HStore.class); - // the reader it comes from: private final StoreFile.Reader reader; private final HFileScanner hfs; @@ -158,7 +154,7 @@ public class StoreFileScanner implements KeyValueScanner { try { try { if(!seekAtOrAfter(hfs, key)) { - close(); + this.cur = null; return false; } @@ -185,7 +181,7 @@ public class StoreFileScanner implements KeyValueScanner { try { try { if (!reseekAtOrAfter(hfs, key)) { - close(); + this.cur = null; return false; } setCurrentCell(hfs.getKeyValue()); @@ -219,7 +215,7 @@ public class StoreFileScanner implements KeyValueScanner { Cell startKV = cur; while(enforceMVCC && cur != null - && (cur.getMvccVersion() > readPt)) { + && (cur.getSequenceId() > readPt)) { hfs.next(); setCurrentCell(hfs.getKeyValue()); if (this.stopSkippingKVsIfNextRow @@ -229,7 +225,6 @@ public class StoreFileScanner implements KeyValueScanner { } if (cur == null) { - close(); return false; } @@ -237,8 +232,8 @@ public class StoreFileScanner implements KeyValueScanner { } public void close() { - // Nothing to close on HFileScanner? cur = null; + this.hfs.close(); } /** @@ -421,7 +416,6 @@ public class StoreFileScanner implements KeyValueScanner { } @Override - @SuppressWarnings("deprecation") public boolean seekToPreviousRow(Cell key) throws IOException { try { try { @@ -429,7 +423,7 @@ public class StoreFileScanner implements KeyValueScanner { key.getRowLength()); if (seekCount != null) seekCount.incrementAndGet(); if (!hfs.seekBefore(seekKey)) { - close(); + this.cur = null; return false; } KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue() @@ -437,7 +431,7 @@ public class StoreFileScanner implements KeyValueScanner { if (seekCount != null) seekCount.incrementAndGet(); if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { - close(); + this.cur = null; return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index cbca57bb548..4be5c7b6a73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -22,8 +22,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; @@ -84,6 +86,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final long maxRowSize; protected final long cellsPerHeartbeatCheck; + protected Set heapsForDelayedClose = new HashSet(); + /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not * KVs skipped via seeking to next row/column. TODO: estimate them? @@ -437,17 +441,32 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public void close() { + close(true); + } + + private void close(boolean withHeapClose){ lock.lock(); try { - if (this.closing) return; - this.closing = true; - // under test, we dont have a this.store - if (this.store != null) - this.store.deleteChangedReaderObserver(this); - if (this.heap != null) - this.heap.close(); - this.heap = null; // CLOSED! - this.lastTop = null; // If both are null, we are closed. + if (this.closing) return; + this.closing = true; + // under test, we dont have a this.store + if (this.store != null) this.store.deleteChangedReaderObserver(this); + if (withHeapClose) { + for (KeyValueHeap h : this.heapsForDelayedClose) { + h.close(); + } + this.heapsForDelayedClose.clear(); + if (this.heap != null) { + this.heap.close(); + this.heap = null; // CLOSED! + } + } else { + if (this.heap != null) { + this.heapsForDelayedClose.add(this.heap); + this.heap = null; + } + } + this.lastTop = null; // If both are null, we are closed. } finally { lock.unlock(); } @@ -491,13 +510,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // if the heap was left null, then the scanners had previously run out anyways, close and // return. if (this.heap == null) { - close(); + close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } Cell peeked = this.heap.peek(); if (peeked == null) { - close(); + close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -547,7 +566,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner Filter f = matcher.getFilter(); if (f != null) { - // TODO convert Scan Query Matcher to be Cell instead of KV based ? cell = f.transformCell(cell); } @@ -604,7 +622,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); case DONE_SCAN: - close(); + close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); case SEEK_NEXT_ROW: @@ -626,7 +644,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner break; case SEEK_NEXT_USING_HINT: - // TODO convert resee to Cell? Cell nextKV = matcher.getNextKeyHint(cell); if (nextKV != null) { seekAsDirection(nextKV); @@ -645,7 +662,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // No more keys - close(); + close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } finally { lock.unlock(); @@ -705,7 +722,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner //DebugPrint.println("SS updateReaders, topKey = " + lastTop); // close scanners to old obsolete Store files - this.heap.close(); // bubble thru and close all scanners. + this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP // Let the next() call handle re-creating and seeking diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index 0fa904c14e7..5b0ab3bb99f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -190,12 +190,14 @@ public class TestKeyValueHeap extends HBaseTestCase { l1.add(new KeyValue(row1, fam1, col5, data)); l1.add(new KeyValue(row2, fam1, col1, data)); l1.add(new KeyValue(row2, fam1, col2, data)); - scanners.add(new Scanner(l1)); + Scanner s1 = new Scanner(l1); + scanners.add(s1); List l2 = new ArrayList(); l2.add(new KeyValue(row1, fam1, col1, data)); l2.add(new KeyValue(row1, fam1, col2, data)); - scanners.add(new Scanner(l2)); + Scanner s2 = new Scanner(l2); + scanners.add(s2); List l3 = new ArrayList(); l3.add(new KeyValue(row1, fam1, col3, data)); @@ -203,16 +205,25 @@ public class TestKeyValueHeap extends HBaseTestCase { l3.add(new KeyValue(row1, fam2, col1, data)); l3.add(new KeyValue(row1, fam2, col2, data)); l3.add(new KeyValue(row2, fam1, col3, data)); - scanners.add(new Scanner(l3)); + Scanner s3 = new Scanner(l3); + scanners.add(s3); List l4 = new ArrayList(); - scanners.add(new Scanner(l4)); + Scanner s4 = new Scanner(l4); + scanners.add(s4); //Creating KeyValueHeap KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); while(kvh.next() != null); - + // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority + // queue and added to a Set for lazy close. The actual close will happen only on KVHeap#close() + assertEquals(4, kvh.scannersForDelayedClose.size()); + assertTrue(kvh.scannersForDelayedClose.contains(s1)); + assertTrue(kvh.scannersForDelayedClose.contains(s2)); + assertTrue(kvh.scannersForDelayedClose.contains(s3)); + assertTrue(kvh.scannersForDelayedClose.contains(s4)); + kvh.close(); for(KeyValueScanner scanner : scanners) { assertTrue(((Scanner)scanner).isClosed()); }