HBASE-1609 We wait on leases to expire before regionserver goes down. Rather, just let client fail

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@794956 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-07-17 04:27:11 +00:00
parent ef30d39681
commit aa4df127a9
4 changed files with 29 additions and 41 deletions

View File

@ -491,6 +491,8 @@ Release 0.20.0 - Unreleased
HBASE-1662 Tool to run major compaction on catalog regions when hbase is HBASE-1662 Tool to run major compaction on catalog regions when hbase is
shutdown shutdown
HBASE-1665 expose more load information to the client side HBASE-1665 expose more load information to the client side
HBASE-1609 We wait on leases to expire before regionserver goes down.
Rather, just let client fail
OPTIMIZATIONS OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue HBASE-1412 Change values for delete column and column family in KeyValue

View File

@ -21,7 +21,10 @@ package org.apache.hadoop.hbase;
/** /**
* Thrown if a region server is passed an unknown scanner id * Thrown if a region server is passed an unknown scanner id.
* Usually means the client has take too long between checkins and so the
* scanner lease on the serverside has expired OR the serverside is closing
* down and has cancelled all leases.
*/ */
public class UnknownScannerException extends DoNotRetryIOException { public class UnknownScannerException extends DoNotRetryIOException {
private static final long serialVersionUID = 993179627856392526L; private static final long serialVersionUID = 993179627856392526L;

View File

@ -30,7 +30,6 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -198,7 +197,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
new ReentrantReadWriteLock(); new ReentrantReadWriteLock();
private final Object splitLock = new Object(); private final Object splitLock = new Object();
private long minSequenceId; private long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
/** /**
* Name of the region info file that resides just under the region directory. * Name of the region info file that resides just under the region directory.
@ -466,19 +464,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} }
newScannerLock.writeLock().lock(); newScannerLock.writeLock().lock();
try { try {
// Wait for active scanners to finish. The write lock we hold will
// prevent new scanners from being created.
synchronized (activeScannerCount) {
while (activeScannerCount.get() != 0) {
LOG.debug("waiting for " + activeScannerCount.get() +
" scanners to finish");
try {
activeScannerCount.wait();
} catch (InterruptedException e) {
// continue
}
}
}
splitsAndClosesLock.writeLock().lock(); splitsAndClosesLock.writeLock().lock();
LOG.debug("Updates disabled for region, no outstanding scanners on " + LOG.debug("Updates disabled for region, no outstanding scanners on " +
this); this);
@ -1690,8 +1675,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* It is used to combine scanners from multiple Stores (aka column families). * It is used to combine scanners from multiple Stores (aka column families).
*/ */
class RegionScanner implements InternalScanner { class RegionScanner implements InternalScanner {
private KeyValueHeap storeHeap; private final KeyValueHeap storeHeap;
private byte [] stopRow; private final byte [] stopRow;
RegionScanner(Scan scan) { RegionScanner(Scan scan) {
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
@ -1708,10 +1693,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} }
this.storeHeap = this.storeHeap =
new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator); new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
// As we have now successfully completed initialization, increment the
// activeScanner count.
activeScannerCount.incrementAndGet();
} }
/** /**
@ -1763,23 +1744,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} }
public void close() { public void close() {
try {
storeHeap.close(); storeHeap.close();
} finally {
synchronized (activeScannerCount) {
int count = activeScannerCount.decrementAndGet();
if (count < 0) {
LOG.error("active scanner count less than zero: " + count +
" resetting to zero");
activeScannerCount.set(0);
count = 0;
}
if (count == 0) {
activeScannerCount.notifyAll();
}
}
}
} }
/** /**
* *
* @param scanner to be closed * @param scanner to be closed

View File

@ -1623,6 +1623,15 @@ public class HRegionServer implements HConstants, HRegionInterface,
} finally { } finally {
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
} }
// Close any outstanding scanners. Means they'll get an UnknownScanner
// exception next time they come in.
for (Map.Entry<String, InternalScanner> e: this.scanners.entrySet()) {
try {
e.getValue().close();
} catch (IOException ioe) {
LOG.warn("Closing scanner " + e.getKey(), ioe);
}
}
for (HRegion region: regionsToClose) { for (HRegion region: regionsToClose) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("closing region " + Bytes.toString(region.getRegionName())); LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
@ -1880,15 +1889,22 @@ public class HRegionServer implements HConstants, HRegionInterface,
} }
public Result [] next(final long scannerId, int nbRows) throws IOException { public Result [] next(final long scannerId, int nbRows) throws IOException {
checkOpen();
List<Result> results = new ArrayList<Result>();
try { try {
String scannerName = String.valueOf(scannerId); String scannerName = String.valueOf(scannerId);
InternalScanner s = scanners.get(scannerName); InternalScanner s = scanners.get(scannerName);
if (s == null) { if (s == null) {
throw new UnknownScannerException("Name: " + scannerName); throw new UnknownScannerException("Name: " + scannerName);
} }
try {
checkOpen();
} catch (IOException e) {
// If checkOpen failed, cancel this lease; filesystem is gone or we're
// closing or something.
this.leases.cancelLease(scannerName);
throw e;
}
this.leases.renewLease(scannerName); this.leases.renewLease(scannerName);
List<Result> results = new ArrayList<Result>();
for (int i = 0; i < nbRows; i++) { for (int i = 0; i < nbRows; i++) {
requestCount.incrementAndGet(); requestCount.incrementAndGet();
// Collect values to be returned here // Collect values to be returned here