HBASE-9926 Scanner doesn't check if a region is available

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1540883 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-11-11 23:40:37 +00:00
parent 3bd91918a5
commit 26ddfe10b6
2 changed files with 280 additions and 195 deletions

View File

@ -2643,15 +2643,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
return t;
}
/*
* @param t
*
* @return Make <code>t</code> an IOE if it isn't already.
*/
protected IOException convertThrowableToIOE(final Throwable t) {
return convertThrowableToIOE(t, null);
}
/*
* @param t
*
@ -2709,15 +2700,16 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
return this.fsOk;
}
protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
protected long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
long scannerId = -1;
while (true) {
scannerId = Math.abs(rand.nextLong() << 24) ^ startcode;
String scannerName = String.valueOf(scannerId);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
RegionScannerHolder existing =
scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
if (existing == null) {
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName));
new ScannerListener(scannerName));
break;
}
}
@ -2970,190 +2962,191 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
requestCount.increment();
try {
int ttl = 0;
HRegion region = null;
RegionScanner scanner = null;
RegionScannerHolder rsh = null;
boolean moreResults = true;
boolean closeScanner = false;
ScanResponse.Builder builder = ScanResponse.newBuilder();
if (request.hasCloseScanner()) {
closeScanner = request.getCloseScanner();
}
int rows = 1;
if (request.hasNumberOfRows()) {
rows = request.getNumberOfRows();
}
if (request.hasScannerId()) {
rsh = scanners.get(scannerName);
if (rsh == null) {
LOG.info("Client tried to access missing scanner " + scannerName);
throw new UnknownScannerException(
"Name: " + scannerName + ", already closed?");
}
scanner = rsh.s;
region = getRegion(scanner.getRegionInfo().getRegionName());
} else {
region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan);
// if the request doesn't set this, get the default region setting.
if (!isLoadingCfsOnDemandSet) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}
scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
region.prepareScanner(scan);
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().preScannerOpen(scan);
}
if (scanner == null) {
scanner = region.getScanner(scan);
}
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
}
scannerId = addScanner(scanner);
scannerName = String.valueOf(scannerId);
ttl = this.scannerLeaseTimeoutPeriod;
}
if (rows > 0) {
// if nextCallSeq does not match throw Exception straight away. This needs to be
// performed even before checking of Lease.
// See HBASE-5974
if (request.hasNextCallSeq()) {
if (rsh == null) {
rsh = scanners.get(scannerName);
}
if (rsh != null) {
if (request.getNextCallSeq() != rsh.nextCallSeq) {
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
"; request=" + TextFormat.shortDebugString(request));
}
// Increment the nextCallSeq value which is the next expected from client.
rsh.nextCallSeq++;
}
}
try {
// Remove lease while its being processed in server; protects against case
// where processing of request takes > lease expiration time.
lease = leases.removeLease(scannerName);
List<Result> results = new ArrayList<Result>(rows);
long currentScanResultSize = 0;
boolean done = false;
// Call coprocessor. Get region info from scanner.
if (region != null && region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(
scanner, results, rows);
if (!results.isEmpty()) {
for (Result r : results) {
if (maxScannerResultSize < Long.MAX_VALUE){
for (Cell kv : r.rawCells()) {
// TODO
currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
}
}
}
}
if (bypass != null && bypass.booleanValue()) {
done = true;
}
}
if (!done) {
long maxResultSize = scanner.getMaxResultSize();
if (maxResultSize <= 0) {
maxResultSize = maxScannerResultSize;
}
List<Cell> values = new ArrayList<Cell>();
region.startRegionOperation(Operation.SCAN);
try {
int i = 0;
synchronized(scanner) {
for (; i < rows
&& currentScanResultSize < maxResultSize; i++) {
// Collect values to be returned here
boolean moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
if (maxScannerResultSize < Long.MAX_VALUE){
for (Cell kv : values) {
currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
}
}
results.add(Result.create(values));
}
if (!moreRows) {
break;
}
values.clear();
}
}
region.readRequestsCount.add(i);
} finally {
region.closeRegionOperation();
}
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
}
}
// If the scanner's filter - if any - is done with the scan
// and wants to tell the client to stop the scan. This is done by passing
// a null result, and setting moreResults to false.
if (scanner.isFilterDone() && results.isEmpty()) {
moreResults = false;
results = null;
} else {
addResults(builder, results, controller);
}
} finally {
// We're done. On way out re-add the above removed lease.
// Adding resets expiration time on lease.
if (scanners.containsKey(scannerName)) {
if (lease != null) leases.addLease(lease);
ttl = this.scannerLeaseTimeoutPeriod;
}
}
}
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
return builder.build(); // bypass
}
}
rsh = scanners.remove(scannerName);
if (rsh != null) {
scanner = rsh.s;
scanner.close();
leases.cancelLease(scannerName);
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
}
if (ttl > 0) {
builder.setTtl(ttl);
}
builder.setScannerId(scannerId);
builder.setMoreResults(moreResults);
return builder.build();
} catch (Throwable t) {
if (scannerName != null && t instanceof NotServingRegionException) {
scanners.remove(scannerName);
}
throw convertThrowableToIOE(cleanup(t));
int ttl = 0;
HRegion region = null;
RegionScanner scanner = null;
RegionScannerHolder rsh = null;
boolean moreResults = true;
boolean closeScanner = false;
ScanResponse.Builder builder = ScanResponse.newBuilder();
if (request.hasCloseScanner()) {
closeScanner = request.getCloseScanner();
}
int rows = 1;
if (request.hasNumberOfRows()) {
rows = request.getNumberOfRows();
}
if (request.hasScannerId()) {
rsh = scanners.get(scannerName);
if (rsh == null) {
LOG.info("Client tried to access missing scanner " + scannerName);
throw new UnknownScannerException(
"Name: " + scannerName + ", already closed?");
}
scanner = rsh.s;
HRegionInfo hri = scanner.getRegionInfo();
region = getRegion(hri.getRegionName());
if (region != rsh.r) { // Yes, should be the same instance
throw new NotServingRegionException("Region was re-opened after the scanner"
+ scannerName + " was created: " + hri.getRegionNameAsString());
}
} else {
region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan);
// if the request doesn't set this, get the default region setting.
if (!isLoadingCfsOnDemandSet) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}
scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
region.prepareScanner(scan);
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().preScannerOpen(scan);
}
if (scanner == null) {
scanner = region.getScanner(scan);
}
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
}
scannerId = addScanner(scanner, region);
scannerName = String.valueOf(scannerId);
ttl = this.scannerLeaseTimeoutPeriod;
}
if (rows > 0) {
// if nextCallSeq does not match throw Exception straight away. This needs to be
// performed even before checking of Lease.
// See HBASE-5974
if (request.hasNextCallSeq()) {
if (rsh == null) {
rsh = scanners.get(scannerName);
}
if (rsh != null) {
if (request.getNextCallSeq() != rsh.nextCallSeq) {
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
"; request=" + TextFormat.shortDebugString(request));
}
// Increment the nextCallSeq value which is the next expected from client.
rsh.nextCallSeq++;
}
}
try {
// Remove lease while its being processed in server; protects against case
// where processing of request takes > lease expiration time.
lease = leases.removeLease(scannerName);
List<Result> results = new ArrayList<Result>(rows);
long currentScanResultSize = 0;
boolean done = false;
// Call coprocessor. Get region info from scanner.
if (region != null && region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(
scanner, results, rows);
if (!results.isEmpty()) {
for (Result r : results) {
if (maxScannerResultSize < Long.MAX_VALUE){
for (Cell kv : r.rawCells()) {
// TODO
currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
}
}
}
}
if (bypass != null && bypass.booleanValue()) {
done = true;
}
}
if (!done) {
long maxResultSize = scanner.getMaxResultSize();
if (maxResultSize <= 0) {
maxResultSize = maxScannerResultSize;
}
List<Cell> values = new ArrayList<Cell>();
region.startRegionOperation(Operation.SCAN);
try {
int i = 0;
synchronized(scanner) {
for (; i < rows
&& currentScanResultSize < maxResultSize; i++) {
// Collect values to be returned here
boolean moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
if (maxScannerResultSize < Long.MAX_VALUE){
for (Cell kv : values) {
currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
}
}
results.add(Result.create(values));
}
if (!moreRows) {
break;
}
values.clear();
}
}
region.readRequestsCount.add(i);
} finally {
region.closeRegionOperation();
}
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
}
}
// If the scanner's filter - if any - is done with the scan
// and wants to tell the client to stop the scan. This is done by passing
// a null result, and setting moreResults to false.
if (scanner.isFilterDone() && results.isEmpty()) {
moreResults = false;
results = null;
} else {
addResults(builder, results, controller);
}
} finally {
// We're done. On way out re-add the above removed lease.
// Adding resets expiration time on lease.
if (scanners.containsKey(scannerName)) {
if (lease != null) leases.addLease(lease);
ttl = this.scannerLeaseTimeoutPeriod;
}
}
}
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
return builder.build(); // bypass
}
}
rsh = scanners.remove(scannerName);
if (rsh != null) {
scanner = rsh.s;
scanner.close();
leases.cancelLease(scannerName);
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
}
if (ttl > 0) {
builder.setTtl(ttl);
}
builder.setScannerId(scannerId);
builder.setMoreResults(moreResults);
return builder.build();
} catch (IOException ie) {
if (scannerName != null && ie instanceof NotServingRegionException) {
scanners.remove(scannerName);
}
throw new ServiceException(ie);
}
}
@ -4322,9 +4315,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
private static class RegionScannerHolder {
private RegionScanner s;
private long nextCallSeq = 0L;
private HRegion r;
public RegionScannerHolder(RegionScanner s) {
public RegionScannerHolder(RegionScanner s, HRegion r) {
this.s = s;
this.r = r;
}
}

View File

@ -26,12 +26,23 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -424,6 +435,85 @@ public class TestScannersFromClientSide {
"Testing offset + multiple CFs + maxResults");
}
/**
* Test from client side for scan while the region is reopened
* on the same region server.
*
* @throws Exception
*/
@Test
public void testScanOnReopenedRegion() throws Exception {
byte [] TABLE = Bytes.toBytes("testScanOnReopenedRegion");
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
Put put;
Scan scan;
Result result;
ResultScanner scanner;
boolean toLog = false;
List<Cell> kvListExp;
// table: row, family, c0:0, c1:1
put = new Put(ROW);
for (int i=0; i < QUALIFIERS.length; i++) {
KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
put.add(kv);
}
ht.put(put);
scan = new Scan(ROW);
scanner = ht.getScanner(scan);
HRegionLocation loc = ht.getRegionLocation(ROW);
HRegionInfo hri = loc.getRegionInfo();
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
byte[] regionName = hri.getRegionName();
int i = cluster.getServerWith(regionName);
HRegionServer rs = cluster.getRegionServer(i);
ProtobufUtil.closeRegion(rs, regionName, false);
long startTime = EnvironmentEdgeManager.currentTimeMillis();
long timeOut = 300000;
while (true) {
if (rs.getOnlineRegion(regionName) == null) {
break;
}
assertTrue("Timed out in closing the testing region",
EnvironmentEdgeManager.currentTimeMillis() < startTime + timeOut);
Thread.sleep(500);
}
// Now open the region again.
ZooKeeperWatcher zkw = TEST_UTIL.getZooKeeperWatcher();
try {
HMaster master = cluster.getMaster();
RegionStates states = master.getAssignmentManager().getRegionStates();
states.regionOffline(hri);
states.updateRegionState(hri, State.OPENING);
ZKAssign.createNodeOffline(zkw, hri, loc.getServerName());
ProtobufUtil.openRegion(rs, hri);
startTime = EnvironmentEdgeManager.currentTimeMillis();
while (true) {
if (rs.getOnlineRegion(regionName) != null) {
break;
}
assertTrue("Timed out in open the testing region",
EnvironmentEdgeManager.currentTimeMillis() < startTime + timeOut);
Thread.sleep(500);
}
} finally {
ZKAssign.deleteNodeFailSilent(zkw, hri);
}
// c0:0, c1:1
kvListExp = new ArrayList<Cell>();
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
result = scanner.next();
verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
}
static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,
String msg) {