From f68b180a9f1ed4300cccd5c0b031c93ad7f75146 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Mon, 15 Aug 2011 23:10:12 +0000 Subject: [PATCH] HBASE-4197 RegionServer expects all scanner to be subclasses of HRegion.RegionScanner (Lars Hofhansl) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1158058 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hbase/coprocessor/BaseRegionObserver.java | 9 +-- .../hbase/coprocessor/RegionObserver.java | 11 ++-- .../hadoop/hbase/regionserver/HRegion.java | 32 +++++------ .../hbase/regionserver/HRegionServer.java | 55 ++++++------------- .../regionserver/RegionCoprocessorHost.java | 6 +- .../hbase/regionserver/RegionScanner.java | 38 +++++++++++++ .../coprocessor/SimpleRegionObserver.java | 9 +-- .../hbase/regionserver/TestHRegion.java | 12 ++-- .../hbase/regionserver/TestWideScanner.java | 2 +- 10 files changed, 99 insertions(+), 77 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java diff --git a/CHANGES.txt b/CHANGES.txt index 6d06ff108c6..bb990f84eeb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,8 @@ Release 0.91.0 - Unreleased HBASE-451 Remove HTableDescriptor from HRegionInfo addendum that fixes TestTableMapReduce HBASE-3534 Action should not store or serialize regionName (Ted Yu) + HBASE-4197 RegionServer expects all scanner to be subclasses of + HRegion.RegionScanner (Lars Hofhansl) BUG FIXES HBASE-3280 YouAreDeadException being swallowed in HRS getMaster diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index ec88a015130..008dc331045 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -196,14 +197,14 @@ public abstract class BaseRegionObserver implements RegionObserver { } @Override - public InternalScanner preScannerOpen(final ObserverContext e, - final Scan scan, final InternalScanner s) throws IOException { + public RegionScanner preScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { return s; } @Override - public InternalScanner postScannerOpen(final ObserverContext e, - final Scan scan, final InternalScanner s) throws IOException { + public RegionScanner postScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { return s; } diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index cfbb29d9c10..9e343638afa 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -451,12 +452,12 @@ public interface RegionObserver extends Coprocessor { * @param c the environment provided by the region server * @param scan the Scan specification * @param s if not null, the base scanner - * @return an InternalScanner instance to use instead of the base scanner if + * @return an RegionScanner instance to use instead of the base scanner if * overriding default behavior, null otherwise * @throws IOException if an error occurred on the coprocessor */ - InternalScanner preScannerOpen(final ObserverContext c, - final Scan scan, final InternalScanner s) + RegionScanner preScannerOpen(final ObserverContext c, + final Scan scan, final RegionScanner s) throws IOException; /** @@ -470,8 +471,8 @@ public interface RegionObserver extends Coprocessor { * @return the scanner instance to use * @throws IOException if an error occurred on the coprocessor */ - InternalScanner postScannerOpen(final ObserverContext c, - final Scan scan, final InternalScanner s) + RegionScanner postScannerOpen(final ObserverContext c, + final Scan scan, final RegionScanner s) throws IOException; /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 83ff7b2e5d7..6d3114320f4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1322,10 +1322,10 @@ public class HRegion implements HeapSize { // , Writable{ * This Iterator must be closed by the caller. * * @param scan configured {@link Scan} - * @return InternalScanner + * @return RegionScanner * @throws IOException read exceptions */ - public InternalScanner getScanner(Scan scan) throws IOException { + public RegionScanner getScanner(Scan scan) throws IOException { return getScanner(scan, null); } @@ -1338,7 +1338,7 @@ public class HRegion implements HeapSize { // , Writable{ } } - protected InternalScanner getScanner(Scan scan, List additionalScanners) throws IOException { + protected RegionScanner getScanner(Scan scan, List additionalScanners) throws IOException { startRegionOperation(); this.readRequestsCount.increment(); try { @@ -1349,16 +1349,16 @@ public class HRegion implements HeapSize { // , Writable{ checkFamily(family); } } - return instantiateInternalScanner(scan, additionalScanners); + return instantiateRegionScanner(scan, additionalScanners); } finally { closeRegionOperation(); } } - protected InternalScanner instantiateInternalScanner(Scan scan, + protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners) throws IOException { - return new RegionScanner(scan, additionalScanners); + return new RegionScannerImpl(scan, additionalScanners); } /* @@ -2582,11 +2582,9 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * RegionScanner is an iterator through a bunch of rows in an HRegion. - *

- * It is used to combine scanners from multiple Stores (aka column families). + * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). */ - class RegionScanner implements InternalScanner { + class RegionScannerImpl implements RegionScanner { // Package local for testability KeyValueHeap storeHeap = null; private final byte [] stopRow; @@ -2597,10 +2595,10 @@ public class HRegion implements HeapSize { // , Writable{ private boolean filterClosed = false; private long readPt; - public HRegionInfo getRegionName() { + public HRegionInfo getRegionInfo() { return regionInfo; } - RegionScanner(Scan scan, List additionalScanners) throws IOException { + RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); this.filter = scan.getFilter(); this.batch = scan.getBatch(); @@ -2628,7 +2626,7 @@ public class HRegion implements HeapSize { // , Writable{ this.storeHeap = new KeyValueHeap(scanners, comparator); } - RegionScanner(Scan scan) throws IOException { + RegionScannerImpl(Scan scan) throws IOException { this(scan, null); } @@ -2681,7 +2679,7 @@ public class HRegion implements HeapSize { // , Writable{ /* * @return True if a filter rules the scanner is over, done. */ - synchronized boolean isFilterDone() { + public synchronized boolean isFilterDone() { return this.filter != null && this.filter.filterAllRemaining(); } @@ -3360,7 +3358,7 @@ public class HRegion implements HeapSize { // , Writable{ // memstore scan iscan.checkOnlyMemStore(); - InternalScanner scanner = null; + RegionScanner scanner = null; try { scanner = getScanner(iscan); scanner.next(results); @@ -3440,7 +3438,7 @@ public class HRegion implements HeapSize { // , Writable{ } } - InternalScanner scanner = null; + RegionScanner scanner = null; try { scanner = getScanner(scan); scanner.next(results); @@ -3831,7 +3829,7 @@ public class HRegion implements HeapSize { // , Writable{ // Default behavior Scan scan = new Scan(); // scan.addFamily(HConstants.CATALOG_FAMILY); - InternalScanner scanner = region.getScanner(scan); + RegionScanner scanner = region.getScanner(scan); try { List kvs = new ArrayList(); boolean done = false; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7a917dac2d6..ea7213e9972 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -252,8 +252,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; - final Map scanners = - new ConcurrentHashMap(); + final Map scanners = + new ConcurrentHashMap(); // zookeeper connection and watcher private ZooKeeperWatcher zooKeeper; @@ -451,14 +451,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return NORMAL_QOS; // doh. } String scannerIdString = Long.toString(scannerId); - InternalScanner scanner = scanners.get(scannerIdString); - if (scanner instanceof HRegion.RegionScanner) { - HRegion.RegionScanner rs = (HRegion.RegionScanner) scanner; - HRegionInfo regionName = rs.getRegionName(); - if (regionName.isMetaRegion()) { - // LOG.debug("High priority scanner request: " + scannerId); - return HIGH_QOS; - } + RegionScanner scanner = scanners.get(scannerIdString); + HRegionInfo regionName = scanner.getRegionInfo(); + if (regionName.isMetaRegion()) { + // LOG.debug("High priority scanner request: " + scannerId); + return HIGH_QOS; } } else if (inv.getParameterClasses().length == 0) { // Just let it through. This is getOnlineRegions, etc. @@ -823,7 +820,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. - for (Map.Entry e : this.scanners.entrySet()) { + for (Map.Entry e : this.scanners.entrySet()) { try { e.getValue().close(); } catch (IOException ioe) { @@ -1955,7 +1952,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, try { HRegion r = getRegion(regionName); r.prepareScanner(scan); - InternalScanner s = null; + RegionScanner s = null; if (r.getCoprocessorHost() != null) { s = r.getCoprocessorHost().preScannerOpen(scan); } @@ -1971,7 +1968,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } } - protected long addScanner(InternalScanner s) throws LeaseStillHeldException { + protected long addScanner(RegionScanner s) throws LeaseStillHeldException { long scannerId = -1L; scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); @@ -1990,7 +1987,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public Result[] next(final long scannerId, int nbRows) throws IOException { String scannerName = String.valueOf(scannerId); - InternalScanner s = this.scanners.get(scannerName); + RegionScanner s = this.scanners.get(scannerName); if (s == null) throw new UnknownScannerException("Name: " + scannerName); try { checkOpen(); @@ -2015,14 +2012,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, List values = new ArrayList(); // Call coprocessor. Get region info from scanner. - HRegion region = null; - if (s instanceof HRegion.RegionScanner) { - HRegion.RegionScanner rs = (HRegion.RegionScanner) s; - region = getRegion(rs.getRegionName().getRegionName()); - } else { - throw new IOException("InternalScanner implementation is expected " + - "to be HRegion.RegionScanner."); - } + HRegion region = getRegion(s.getRegionInfo().getRegionName()); if (region != null && region.getCoprocessorHost() != null) { Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, nbRows); @@ -2034,7 +2024,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } } if (bypass != null) { - return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null + return s.isFilterDone() && results.isEmpty() ? null : results.toArray(new Result[0]); } } @@ -2061,13 +2051,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, region.getCoprocessorHost().postScannerNext(s, results, nbRows, true); } - // Below is an ugly hack where we cast the InternalScanner to be a - // HRegion.RegionScanner. The alternative is to change InternalScanner - // interface but its used everywhere whereas we just need a bit of info - // from HRegion.RegionScanner, IF its filter if any is done with the scan + // If the scanner's filter - if any - is done with the scan // and wants to tell the client to stop the scan. This is done by passing // a null result. - return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null + return s.isFilterDone() && results.isEmpty() ? null : results.toArray(new Result[0]); } catch (Throwable t) { if (t instanceof NotServingRegionException) { @@ -2088,18 +2075,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, checkOpen(); requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); - InternalScanner s = scanners.get(scannerName); + RegionScanner s = scanners.get(scannerName); HRegion region = null; if (s != null) { // call coprocessor. - if (s instanceof HRegion.RegionScanner) { - HRegion.RegionScanner rs = (HRegion.RegionScanner) s; - region = getRegion(rs.getRegionName().getRegionName()); - } else { - throw new IOException("InternalScanner implementation is expected " + - "to be HRegion.RegionScanner."); - } + region = getRegion(s.getRegionInfo().getRegionName()); if (region != null && region.getCoprocessorHost() != null) { if (region.getCoprocessorHost().preScannerClose(s)) { return; // bypass @@ -2134,7 +2115,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void leaseExpired() { LOG.info("Scanner " + this.scannerName + " lease expired"); - InternalScanner s = scanners.remove(this.scannerName); + RegionScanner s = scanners.remove(this.scannerName); if (s != null) { try { s.close(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index c44da73f3db..988ddc26741 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -803,9 +803,9 @@ public class RegionCoprocessorHost * bypassed, false otherwise * @exception IOException Exception */ - public InternalScanner preScannerOpen(Scan scan) throws IOException { + public RegionScanner preScannerOpen(Scan scan) throws IOException { boolean bypass = false; - InternalScanner s = null; + RegionScanner s = null; ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { @@ -826,7 +826,7 @@ public class RegionCoprocessorHost * @return the scanner instance to use * @exception IOException Exception */ - public InternalScanner postScannerOpen(final Scan scan, InternalScanner s) + public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java new file mode 100644 index 00000000000..b10aecafa7e --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -0,0 +1,38 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * RegionScanner describes iterators over rows in an HRegion. + */ +public interface RegionScanner extends InternalScanner { + /** + * @return The RegionInfo for this scanner. + */ + public HRegionInfo getRegionInfo(); + + /** + * @return True if a filter indicates that this scanner will return no + * further rows. + */ + public boolean isFilterDone(); +} diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index c5e81e701c3..c0b726771ea 100644 --- a/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; /** @@ -144,16 +145,16 @@ public class SimpleRegionObserver extends BaseRegionObserver { } @Override - public InternalScanner preScannerOpen(final ObserverContext c, + public RegionScanner preScannerOpen(final ObserverContext c, final Scan scan, - final InternalScanner s) throws IOException { + final RegionScanner s) throws IOException { hadPreScannerOpen = true; return null; } @Override - public InternalScanner postScannerOpen(final ObserverContext c, - final Scan scan, final InternalScanner s) + public RegionScanner postScannerOpen(final ObserverContext c, + final Scan scan, final RegionScanner s) throws IOException { hadPostScannerOpen = true; return s; diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a243011ef67..8e9c8927210 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -66,7 +66,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1439,22 +1439,22 @@ public class TestHRegion extends HBaseTestCase { region.put(put); Scan scan = null; - HRegion.RegionScanner is = null; + HRegion.RegionScannerImpl is = null; //Testing to see how many scanners that is produced by getScanner, starting //with known number, 2 - current = 1 scan = new Scan(); scan.addFamily(fam2); scan.addFamily(fam4); - is = (RegionScanner) region.getScanner(scan); + is = (RegionScannerImpl) region.getScanner(scan); ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); - assertEquals(1, ((RegionScanner)is).storeHeap.getHeap().size()); + assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); scan = new Scan(); - is = (RegionScanner) region.getScanner(scan); + is = (RegionScannerImpl) region.getScanner(scan); ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); assertEquals(families.length -1, - ((RegionScanner)is).storeHeap.getHeap().size()); + ((RegionScannerImpl)is).storeHeap.getHeap().size()); } /** diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index 3f03923e456..b2cd21cf44a 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -138,7 +138,7 @@ public class TestWideScanner extends HBaseTestCase { // trigger ChangedReadersObservers Iterator scanners = - ((HRegion.RegionScanner)s).storeHeap.getHeap().iterator(); + ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator(); while (scanners.hasNext()) { StoreScanner ss = (StoreScanner)scanners.next(); ss.updateReaders();