HBASE-4197 RegionServer expects all scanner to be subclasses of

HRegion.RegionScanner (Lars Hofhansl)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1158058 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-08-15 23:10:12 +00:00
parent 11c7458018
commit f68b180a9f
10 changed files with 99 additions and 77 deletions

View File

@ -14,6 +14,8 @@ Release 0.91.0 - Unreleased
HBASE-451 Remove HTableDescriptor from HRegionInfo
addendum that fixes TestTableMapReduce
HBASE-3534 Action should not store or serialize regionName (Ted Yu)
HBASE-4197 RegionServer expects all scanner to be subclasses of
HRegion.RegionScanner (Lars Hofhansl)
BUG FIXES
HBASE-3280 YouAreDeadException being swallowed in HRS getMaster

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -196,14 +197,14 @@ public abstract class BaseRegionObserver implements RegionObserver {
}
@Override
public InternalScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final InternalScanner s) throws IOException {
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {
return s;
}
@Override
public InternalScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final InternalScanner s) throws IOException {
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {
return s;
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -451,12 +452,12 @@ public interface RegionObserver extends Coprocessor {
* @param c the environment provided by the region server
* @param scan the Scan specification
* @param s if not null, the base scanner
* @return an InternalScanner instance to use instead of the base scanner if
* @return an RegionScanner instance to use instead of the base scanner if
* overriding default behavior, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final InternalScanner s)
RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s)
throws IOException;
/**
@ -470,8 +471,8 @@ public interface RegionObserver extends Coprocessor {
* @return the scanner instance to use
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final InternalScanner s)
RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s)
throws IOException;
/**

View File

@ -1322,10 +1322,10 @@ public class HRegion implements HeapSize { // , Writable{
* This Iterator must be closed by the caller.
*
* @param scan configured {@link Scan}
* @return InternalScanner
* @return RegionScanner
* @throws IOException read exceptions
*/
public InternalScanner getScanner(Scan scan) throws IOException {
public RegionScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}
@ -1338,7 +1338,7 @@ public class HRegion implements HeapSize { // , Writable{
}
}
protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
protected RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
startRegionOperation();
this.readRequestsCount.increment();
try {
@ -1349,16 +1349,16 @@ public class HRegion implements HeapSize { // , Writable{
checkFamily(family);
}
}
return instantiateInternalScanner(scan, additionalScanners);
return instantiateRegionScanner(scan, additionalScanners);
} finally {
closeRegionOperation();
}
}
protected InternalScanner instantiateInternalScanner(Scan scan,
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
return new RegionScanner(scan, additionalScanners);
return new RegionScannerImpl(scan, additionalScanners);
}
/*
@ -2582,11 +2582,9 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
* RegionScanner is an iterator through a bunch of rows in an HRegion.
* <p>
* It is used to combine scanners from multiple Stores (aka column families).
* RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
*/
class RegionScanner implements InternalScanner {
class RegionScannerImpl implements RegionScanner {
// Package local for testability
KeyValueHeap storeHeap = null;
private final byte [] stopRow;
@ -2597,10 +2595,10 @@ public class HRegion implements HeapSize { // , Writable{
private boolean filterClosed = false;
private long readPt;
public HRegionInfo getRegionName() {
public HRegionInfo getRegionInfo() {
return regionInfo;
}
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.<init>");
this.filter = scan.getFilter();
this.batch = scan.getBatch();
@ -2628,7 +2626,7 @@ public class HRegion implements HeapSize { // , Writable{
this.storeHeap = new KeyValueHeap(scanners, comparator);
}
RegionScanner(Scan scan) throws IOException {
RegionScannerImpl(Scan scan) throws IOException {
this(scan, null);
}
@ -2681,7 +2679,7 @@ public class HRegion implements HeapSize { // , Writable{
/*
* @return True if a filter rules the scanner is over, done.
*/
synchronized boolean isFilterDone() {
public synchronized boolean isFilterDone() {
return this.filter != null && this.filter.filterAllRemaining();
}
@ -3360,7 +3358,7 @@ public class HRegion implements HeapSize { // , Writable{
// memstore scan
iscan.checkOnlyMemStore();
InternalScanner scanner = null;
RegionScanner scanner = null;
try {
scanner = getScanner(iscan);
scanner.next(results);
@ -3440,7 +3438,7 @@ public class HRegion implements HeapSize { // , Writable{
}
}
InternalScanner scanner = null;
RegionScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
@ -3831,7 +3829,7 @@ public class HRegion implements HeapSize { // , Writable{
// Default behavior
Scan scan = new Scan();
// scan.addFamily(HConstants.CATALOG_FAMILY);
InternalScanner scanner = region.getScanner(scan);
RegionScanner scanner = region.getScanner(scan);
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean done = false;

View File

@ -252,8 +252,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// flag set after we're done setting up server threads (used for testing)
protected volatile boolean isOnline;
final Map<String, InternalScanner> scanners =
new ConcurrentHashMap<String, InternalScanner>();
final Map<String, RegionScanner> scanners =
new ConcurrentHashMap<String, RegionScanner>();
// zookeeper connection and watcher
private ZooKeeperWatcher zooKeeper;
@ -451,14 +451,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return NORMAL_QOS; // doh.
}
String scannerIdString = Long.toString(scannerId);
InternalScanner scanner = scanners.get(scannerIdString);
if (scanner instanceof HRegion.RegionScanner) {
HRegion.RegionScanner rs = (HRegion.RegionScanner) scanner;
HRegionInfo regionName = rs.getRegionName();
if (regionName.isMetaRegion()) {
// LOG.debug("High priority scanner request: " + scannerId);
return HIGH_QOS;
}
RegionScanner scanner = scanners.get(scannerIdString);
HRegionInfo regionName = scanner.getRegionInfo();
if (regionName.isMetaRegion()) {
// LOG.debug("High priority scanner request: " + scannerId);
return HIGH_QOS;
}
} else if (inv.getParameterClasses().length == 0) {
// Just let it through. This is getOnlineRegions, etc.
@ -823,7 +820,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
private void closeAllScanners() {
// Close any outstanding scanners. Means they'll get an UnknownScanner
// exception next time they come in.
for (Map.Entry<String, InternalScanner> e : this.scanners.entrySet()) {
for (Map.Entry<String, RegionScanner> e : this.scanners.entrySet()) {
try {
e.getValue().close();
} catch (IOException ioe) {
@ -1955,7 +1952,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
try {
HRegion r = getRegion(regionName);
r.prepareScanner(scan);
InternalScanner s = null;
RegionScanner s = null;
if (r.getCoprocessorHost() != null) {
s = r.getCoprocessorHost().preScannerOpen(scan);
}
@ -1971,7 +1968,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
}
protected long addScanner(InternalScanner s) throws LeaseStillHeldException {
protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
long scannerId = -1L;
scannerId = rand.nextLong();
String scannerName = String.valueOf(scannerId);
@ -1990,7 +1987,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public Result[] next(final long scannerId, int nbRows) throws IOException {
String scannerName = String.valueOf(scannerId);
InternalScanner s = this.scanners.get(scannerName);
RegionScanner s = this.scanners.get(scannerName);
if (s == null) throw new UnknownScannerException("Name: " + scannerName);
try {
checkOpen();
@ -2015,14 +2012,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
List<KeyValue> values = new ArrayList<KeyValue>();
// Call coprocessor. Get region info from scanner.
HRegion region = null;
if (s instanceof HRegion.RegionScanner) {
HRegion.RegionScanner rs = (HRegion.RegionScanner) s;
region = getRegion(rs.getRegionName().getRegionName());
} else {
throw new IOException("InternalScanner implementation is expected " +
"to be HRegion.RegionScanner.");
}
HRegion region = getRegion(s.getRegionInfo().getRegionName());
if (region != null && region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(s,
results, nbRows);
@ -2034,7 +2024,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
}
if (bypass != null) {
return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null
return s.isFilterDone() && results.isEmpty() ? null
: results.toArray(new Result[0]);
}
}
@ -2061,13 +2051,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
}
// Below is an ugly hack where we cast the InternalScanner to be a
// HRegion.RegionScanner. The alternative is to change InternalScanner
// interface but its used everywhere whereas we just need a bit of info
// from HRegion.RegionScanner, IF its filter if any is done with the scan
// If the scanner's filter - if any - is done with the scan
// and wants to tell the client to stop the scan. This is done by passing
// a null result.
return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null
return s.isFilterDone() && results.isEmpty() ? null
: results.toArray(new Result[0]);
} catch (Throwable t) {
if (t instanceof NotServingRegionException) {
@ -2088,18 +2075,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
checkOpen();
requestCount.incrementAndGet();
String scannerName = String.valueOf(scannerId);
InternalScanner s = scanners.get(scannerName);
RegionScanner s = scanners.get(scannerName);
HRegion region = null;
if (s != null) {
// call coprocessor.
if (s instanceof HRegion.RegionScanner) {
HRegion.RegionScanner rs = (HRegion.RegionScanner) s;
region = getRegion(rs.getRegionName().getRegionName());
} else {
throw new IOException("InternalScanner implementation is expected " +
"to be HRegion.RegionScanner.");
}
region = getRegion(s.getRegionInfo().getRegionName());
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(s)) {
return; // bypass
@ -2134,7 +2115,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public void leaseExpired() {
LOG.info("Scanner " + this.scannerName + " lease expired");
InternalScanner s = scanners.remove(this.scannerName);
RegionScanner s = scanners.remove(this.scannerName);
if (s != null) {
try {
s.close();

View File

@ -803,9 +803,9 @@ public class RegionCoprocessorHost
* bypassed, false otherwise
* @exception IOException Exception
*/
public InternalScanner preScannerOpen(Scan scan) throws IOException {
public RegionScanner preScannerOpen(Scan scan) throws IOException {
boolean bypass = false;
InternalScanner s = null;
RegionScanner s = null;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@ -826,7 +826,7 @@ public class RegionCoprocessorHost
* @return the scanner instance to use
* @exception IOException Exception
*/
public InternalScanner postScannerOpen(final Scan scan, InternalScanner s)
public RegionScanner postScannerOpen(final Scan scan, RegionScanner s)
throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {

View File

@ -0,0 +1,38 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HRegionInfo;
/**
* RegionScanner describes iterators over rows in an HRegion.
*/
public interface RegionScanner extends InternalScanner {
/**
* @return The RegionInfo for this scanner.
*/
public HRegionInfo getRegionInfo();
/**
* @return True if a filter indicates that this scanner will return no
* further rows.
*/
public boolean isFilterDone();
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -144,16 +145,16 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
public InternalScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan,
final InternalScanner s) throws IOException {
final RegionScanner s) throws IOException {
hadPreScannerOpen = true;
return null;
}
@Override
public InternalScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final InternalScanner s)
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s)
throws IOException {
hadPostScannerOpen = true;
return s;

View File

@ -66,7 +66,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1439,22 +1439,22 @@ public class TestHRegion extends HBaseTestCase {
region.put(put);
Scan scan = null;
HRegion.RegionScanner is = null;
HRegion.RegionScannerImpl is = null;
//Testing to see how many scanners that is produced by getScanner, starting
//with known number, 2 - current = 1
scan = new Scan();
scan.addFamily(fam2);
scan.addFamily(fam4);
is = (RegionScanner) region.getScanner(scan);
is = (RegionScannerImpl) region.getScanner(scan);
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
assertEquals(1, ((RegionScanner)is).storeHeap.getHeap().size());
assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size());
scan = new Scan();
is = (RegionScanner) region.getScanner(scan);
is = (RegionScannerImpl) region.getScanner(scan);
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
assertEquals(families.length -1,
((RegionScanner)is).storeHeap.getHeap().size());
((RegionScannerImpl)is).storeHeap.getHeap().size());
}
/**

View File

@ -138,7 +138,7 @@ public class TestWideScanner extends HBaseTestCase {
// trigger ChangedReadersObservers
Iterator<KeyValueScanner> scanners =
((HRegion.RegionScanner)s).storeHeap.getHeap().iterator();
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
ss.updateReaders();