HBASE-2077 NullPointerException with an open scanner that expired causing an immediate region server shutdown -- part 2
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1138180 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
34d2fa085f
commit
791659b27e
|
@ -343,6 +343,8 @@ Release 0.90.4 - Unreleased
|
|||
(Li Pi)
|
||||
HBASE-3988 Infinite loop for secondary master (Liyin Tang)
|
||||
HBASE-3995 HBASE-3946 broke TestMasterFailover
|
||||
HBASE-2077 NullPointerException with an open scanner that expired causing
|
||||
an immediate region server shutdown -- part 2.
|
||||
|
||||
|
||||
IMPROVEMENT
|
||||
|
|
|
@ -1943,12 +1943,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
}
|
||||
|
||||
public Result[] next(final long scannerId, int nbRows) throws IOException {
|
||||
try {
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
InternalScanner s = this.scanners.get(scannerName);
|
||||
if (s == null) {
|
||||
throw new UnknownScannerException("Name: " + scannerName);
|
||||
}
|
||||
if (s == null) throw new UnknownScannerException("Name: " + scannerName);
|
||||
try {
|
||||
checkOpen();
|
||||
} catch (IOException e) {
|
||||
|
@ -1962,7 +1959,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
}
|
||||
throw e;
|
||||
}
|
||||
this.leases.renewLease(scannerName);
|
||||
Leases.Lease lease = null;
|
||||
try {
|
||||
// Remove lease while its being processed in server; protects against case
|
||||
// where processing of request takes > lease expiration time.
|
||||
lease = this.leases.removeLease(scannerName);
|
||||
List<Result> results = new ArrayList<Result>(nbRows);
|
||||
long currentScanResultSize = 0;
|
||||
List<KeyValue> values = new ArrayList<KeyValue>();
|
||||
|
@ -2024,10 +2025,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
: results.toArray(new Result[0]);
|
||||
} catch (Throwable t) {
|
||||
if (t instanceof NotServingRegionException) {
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
this.scanners.remove(scannerName);
|
||||
}
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
} finally {
|
||||
// We're done. On way out readd the above removed lease. Adding resets
|
||||
// expiration time on lease.
|
||||
if (this.scanners.containsKey(scannerName)) {
|
||||
if (lease != null) this.leases.addLease(lease);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -140,16 +140,24 @@ public class Leases extends Thread {
|
|||
*/
|
||||
public void createLease(String leaseName, final LeaseListener listener)
|
||||
throws LeaseStillHeldException {
|
||||
if (stopRequested) {
|
||||
addLease(new Lease(leaseName, listener));
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts lease. Resets expiration before insertion.
|
||||
* @param lease
|
||||
* @throws LeaseStillHeldException
|
||||
*/
|
||||
public void addLease(final Lease lease) throws LeaseStillHeldException {
|
||||
if (this.stopRequested) {
|
||||
return;
|
||||
}
|
||||
Lease lease = new Lease(leaseName, listener,
|
||||
System.currentTimeMillis() + leasePeriod);
|
||||
lease.setExpirationTime(System.currentTimeMillis() + this.leasePeriod);
|
||||
synchronized (leaseQueue) {
|
||||
if (leases.containsKey(leaseName)) {
|
||||
throw new LeaseStillHeldException(leaseName);
|
||||
if (leases.containsKey(lease.getLeaseName())) {
|
||||
throw new LeaseStillHeldException(lease.getLeaseName());
|
||||
}
|
||||
leases.put(leaseName, lease);
|
||||
leases.put(lease.getLeaseName(), lease);
|
||||
leaseQueue.add(lease);
|
||||
}
|
||||
}
|
||||
|
@ -198,26 +206,44 @@ public class Leases extends Thread {
|
|||
|
||||
/**
|
||||
* Client explicitly cancels a lease.
|
||||
*
|
||||
* @param leaseName name of lease
|
||||
* @throws LeaseException
|
||||
*/
|
||||
public void cancelLease(final String leaseName) throws LeaseException {
|
||||
removeLease(leaseName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove named lease.
|
||||
* Lease is removed from the list of leases and removed from the delay queue.
|
||||
* Lease can be resinserted using {@link #addLease(Lease)}
|
||||
*
|
||||
* @param leaseName name of lease
|
||||
* @throws LeaseException
|
||||
* @return Removed lease
|
||||
*/
|
||||
Lease removeLease(final String leaseName) throws LeaseException {
|
||||
Lease lease = null;
|
||||
synchronized (leaseQueue) {
|
||||
Lease lease = leases.remove(leaseName);
|
||||
lease = leases.remove(leaseName);
|
||||
if (lease == null) {
|
||||
throw new LeaseException("lease '" + leaseName + "' does not exist");
|
||||
}
|
||||
leaseQueue.remove(lease);
|
||||
}
|
||||
return lease;
|
||||
}
|
||||
|
||||
/** This class tracks a single Lease. */
|
||||
private static class Lease implements Delayed {
|
||||
static class Lease implements Delayed {
|
||||
private final String leaseName;
|
||||
private final LeaseListener listener;
|
||||
private long expirationTime;
|
||||
|
||||
Lease(final String leaseName, LeaseListener listener) {
|
||||
this(leaseName, listener, 0);
|
||||
}
|
||||
|
||||
Lease(final String leaseName, LeaseListener listener, long expirationTime) {
|
||||
this.leaseName = leaseName;
|
||||
this.listener = listener;
|
||||
|
@ -269,14 +295,5 @@ public class Leases extends Thread {
|
|||
public void setExpirationTime(long expirationTime) {
|
||||
this.expirationTime = expirationTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the expiration time for that lease
|
||||
* @return expiration time
|
||||
*/
|
||||
public long getExpirationTime() {
|
||||
return this.expirationTime;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue