HBASE-6170 Timeouts for row lock and scan should be separate (Chris Trezzo)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1354325 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-06-27 04:59:27 +00:00
parent 275bfdcc40
commit 9214f95cf4
7 changed files with 74 additions and 50 deletions

View File

@ -536,16 +536,25 @@ public final class HConstants {
public static String HBASE_CLIENT_INSTANCE_ID = "hbase.client.instance.id"; public static String HBASE_CLIENT_INSTANCE_ID = "hbase.client.instance.id";
/** /**
* HRegion server lease period in milliseconds. Clients must report in within this period * The row lock timeout period in milliseconds.
* else they are considered dead. Unit measured in ms (milliseconds).
*/ */
public static String HBASE_REGIONSERVER_LEASE_PERIOD_KEY = public static String HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD =
"hbase.regionserver.lease.period"; "hbase.regionserver.rowlock.timeout.period";
/** /**
* Default value of {@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}. * Default value of {@link #HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD}.
*/ */
public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000; public static int DEFAULT_HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD = 60000;
/**
* The client scanner timeout period in milliseconds.
*/
public static String HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = "hbase.client.scanner.timeout.period";
/**
* Default value of {@link #HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD}.
*/
public static int DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = 60000;
/** /**
* timeout for each RPC * timeout for each RPC

View File

@ -106,9 +106,8 @@ public class ClientScanner extends AbstractClientScanner {
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
} }
this.scannerTimeout = (int) conf.getLong( this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
// check if application wants to collect scan metrics // check if application wants to collect scan metrics
byte[] enableMetrics = scan.getAttribute( byte[] enableMetrics = scan.getAttribute(

View File

@ -422,6 +422,16 @@ public class HRegionServer implements ClientProtocol,
*/ */
private MovedRegionsCleaner movedRegionsCleaner; private MovedRegionsCleaner movedRegionsCleaner;
/**
* The lease timeout period for row locks (milliseconds).
*/
private final int rowLockLeaseTimeoutPeriod;
/**
* The lease timeout period for client scanners (milliseconds).
*/
private final int scannerLeaseTimeoutPeriod;
/** /**
* Starts a HRegionServer at the default location * Starts a HRegionServer at the default location
@ -466,6 +476,13 @@ public class HRegionServer implements ClientProtocol,
this.abortRequested = false; this.abortRequested = false;
this.stopped = false; this.stopped = false;
this.rowLockLeaseTimeoutPeriod = conf.getInt(
HConstants.HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD);
this.scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
// Server to handle client requests. // Server to handle client requests.
String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost( String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.regionserver.dns.interface", "default"), conf.get("hbase.regionserver.dns.interface", "default"),
@ -705,10 +722,7 @@ public class HRegionServer implements ClientProtocol,
this.compactionChecker = new CompactionChecker(this, this.compactionChecker = new CompactionChecker(this,
this.threadWakeFrequency * multiplier, this); this.threadWakeFrequency * multiplier, this);
this.leases = new Leases((int) conf.getLong( this.leases = new Leases(this.threadWakeFrequency);
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
this.threadWakeFrequency);
// Create the thread for the ThriftServer. // Create the thread for the ThriftServer.
if (conf.getBoolean("hbase.regionserver.export.thrift", false)) { if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
@ -2658,7 +2672,8 @@ public class HRegionServer implements ClientProtocol,
long lockId = nextLong(); long lockId = nextLong();
String lockName = String.valueOf(lockId); String lockName = String.valueOf(lockId);
rowlocks.put(lockName, r); rowlocks.put(lockName, r);
this.leases.createLease(lockName, new RowLockListener(lockName, region)); this.leases.createLease(lockName, this.rowLockLeaseTimeoutPeriod, new RowLockListener(lockName,
region));
return lockId; return lockId;
} }
@ -2666,7 +2681,8 @@ public class HRegionServer implements ClientProtocol,
long scannerId = nextLong(); long scannerId = nextLong();
String scannerName = String.valueOf(scannerId); String scannerName = String.valueOf(scannerId);
scanners.put(scannerName, s); scanners.put(scannerName, s);
this.leases.createLease(scannerName, new ScannerListener(scannerName)); this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(
scannerName));
return scannerId; return scannerId;
} }
@ -2925,7 +2941,7 @@ public class HRegionServer implements ClientProtocol,
} }
scannerId = addScanner(scanner); scannerId = addScanner(scanner);
scannerName = String.valueOf(scannerId); scannerName = String.valueOf(scannerId);
ttl = leases.leasePeriod; ttl = this.scannerLeaseTimeoutPeriod;
} }
if (rows > 0) { if (rows > 0) {
@ -2999,7 +3015,7 @@ public class HRegionServer implements ClientProtocol,
// Adding resets expiration time on lease. // Adding resets expiration time on lease.
if (scanners.containsKey(scannerName)) { if (scanners.containsKey(scannerName)) {
if (lease != null) leases.addLease(lease); if (lease != null) leases.addLease(lease);
ttl = leases.leasePeriod; ttl = this.scannerLeaseTimeoutPeriod;
} }
} }
} }

View File

@ -55,7 +55,6 @@ import java.io.IOException;
@InterfaceAudience.Private @InterfaceAudience.Private
public class Leases extends HasThread { public class Leases extends HasThread {
private static final Log LOG = LogFactory.getLog(Leases.class.getName()); private static final Log LOG = LogFactory.getLog(Leases.class.getName());
protected final int leasePeriod;
private final int leaseCheckFrequency; private final int leaseCheckFrequency;
private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>(); private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
protected final Map<String, Lease> leases = new HashMap<String, Lease>(); protected final Map<String, Lease> leases = new HashMap<String, Lease>();
@ -63,13 +62,11 @@ public class Leases extends HasThread {
/** /**
* Creates a lease monitor * Creates a lease monitor
* *
* @param leasePeriod - length of time (milliseconds) that the lease is valid
* @param leaseCheckFrequency - how often the lease should be checked * @param leaseCheckFrequency - how often the lease should be checked
* (milliseconds) * (milliseconds)
*/ */
public Leases(final int leasePeriod, final int leaseCheckFrequency) { public Leases(final int leaseCheckFrequency) {
this.leasePeriod = leasePeriod;
this.leaseCheckFrequency = leaseCheckFrequency; this.leaseCheckFrequency = leaseCheckFrequency;
setDaemon(true); setDaemon(true);
} }
@ -135,15 +132,16 @@ public class Leases extends HasThread {
} }
/** /**
* Obtain a lease * Obtain a lease.
* *
* @param leaseName name of the lease * @param leaseName name of the lease
* @param leaseTimeoutPeriod length of the lease in milliseconds
* @param listener listener that will process lease expirations * @param listener listener that will process lease expirations
* @throws LeaseStillHeldException * @throws LeaseStillHeldException
*/ */
public void createLease(String leaseName, final LeaseListener listener) public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
throws LeaseStillHeldException { throws LeaseStillHeldException {
addLease(new Lease(leaseName, listener)); addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
} }
/** /**
@ -155,7 +153,7 @@ public class Leases extends HasThread {
if (this.stopRequested) { if (this.stopRequested) {
return; return;
} }
lease.setExpirationTime(System.currentTimeMillis() + this.leasePeriod); lease.resetExpirationTime();
synchronized (leaseQueue) { synchronized (leaseQueue) {
if (leases.containsKey(lease.getLeaseName())) { if (leases.containsKey(lease.getLeaseName())) {
throw new LeaseStillHeldException(lease.getLeaseName()); throw new LeaseStillHeldException(lease.getLeaseName());
@ -202,7 +200,7 @@ public class Leases extends HasThread {
throw new LeaseException("lease '" + leaseName + throw new LeaseException("lease '" + leaseName +
"' does not exist or has already expired"); "' does not exist or has already expired");
} }
lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); lease.resetExpirationTime();
leaseQueue.add(lease); leaseQueue.add(lease);
} }
} }
@ -241,16 +239,14 @@ public class Leases extends HasThread {
static class Lease implements Delayed { static class Lease implements Delayed {
private final String leaseName; private final String leaseName;
private final LeaseListener listener; private final LeaseListener listener;
private int leaseTimeoutPeriod;
private long expirationTime; private long expirationTime;
Lease(final String leaseName, LeaseListener listener) { Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
this(leaseName, listener, 0);
}
Lease(final String leaseName, LeaseListener listener, long expirationTime) {
this.leaseName = leaseName; this.leaseName = leaseName;
this.listener = listener; this.listener = listener;
this.expirationTime = expirationTime; this.leaseTimeoutPeriod = leaseTimeoutPeriod;
this.expirationTime = 0;
} }
/** @return the lease name */ /** @return the lease name */
@ -294,9 +290,11 @@ public class Leases extends HasThread {
return this.equals(o) ? 0 : (delta > 0 ? 1 : -1); return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
} }
/** @param expirationTime the expirationTime to set */ /**
public void setExpirationTime(long expirationTime) { * Resets the expiration time of the lease.
this.expirationTime = expirationTime; */
public void resetExpirationTime() {
this.expirationTime = System.currentTimeMillis() + this.leaseTimeoutPeriod;
} }
} }
} }

View File

@ -25,7 +25,9 @@ import static org.junit.Assert.fail;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -48,7 +50,7 @@ public class TestScannerTimeout {
private final static byte[] SOME_BYTES = Bytes.toBytes("f"); private final static byte[] SOME_BYTES = Bytes.toBytes("f");
private final static byte[] TABLE_NAME = Bytes.toBytes("t"); private final static byte[] TABLE_NAME = Bytes.toBytes("t");
private final static int NB_ROWS = 10; private final static int NB_ROWS = 10;
// Be careful w/ what you set this timer too... it can get in the way of // Be careful w/ what you set this timer to... it can get in the way of
// the mini cluster coming up -- the verification in particular. // the mini cluster coming up -- the verification in particular.
private final static int SCANNER_TIMEOUT = 10000; private final static int SCANNER_TIMEOUT = 10000;
private final static int SCANNER_CACHING = 5; private final static int SCANNER_CACHING = 5;
@ -59,7 +61,7 @@ public class TestScannerTimeout {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
Configuration c = TEST_UTIL.getConfiguration(); Configuration c = TEST_UTIL.getConfiguration();
c.setInt("hbase.regionserver.lease.period", SCANNER_TIMEOUT); c.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SCANNER_TIMEOUT);
// We need more than one region server for this test // We need more than one region server for this test
TEST_UTIL.startMiniCluster(2); TEST_UTIL.startMiniCluster(2);
HTable table = TEST_UTIL.createTable(TABLE_NAME, SOME_BYTES); HTable table = TEST_UTIL.createTable(TABLE_NAME, SOME_BYTES);
@ -134,8 +136,7 @@ public class TestScannerTimeout {
// Since the RS is already created, this conf is client-side only for // Since the RS is already created, this conf is client-side only for
// this new table // this new table
Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt( conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SCANNER_TIMEOUT * 100);
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, SCANNER_TIMEOUT*100);
HTable higherScanTimeoutTable = new HTable(conf, TABLE_NAME); HTable higherScanTimeoutTable = new HTable(conf, TABLE_NAME);
ResultScanner r = higherScanTimeoutTable.getScanner(scan); ResultScanner r = higherScanTimeoutTable.getScanner(scan);
// This takes way less than SCANNER_TIMEOUT*100 // This takes way less than SCANNER_TIMEOUT*100
@ -201,8 +202,7 @@ public class TestScannerTimeout {
// Since the RS is already created, this conf is client-side only for // Since the RS is already created, this conf is client-side only for
// this new table // this new table
Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt( conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SCANNER_TIMEOUT * 100);
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, SCANNER_TIMEOUT*100);
HTable higherScanTimeoutTable = new HTable(conf, TABLE_NAME); HTable higherScanTimeoutTable = new HTable(conf, TABLE_NAME);
ResultScanner r = higherScanTimeoutTable.getScanner(scan); ResultScanner r = higherScanTimeoutTable.getScanner(scan);
int count = 1; int count = 1;

View File

@ -325,8 +325,9 @@ public class TestCoprocessorInterface extends HBaseTestCase {
// Make lease timeout longer, lease checks less frequent // Make lease timeout longer, lease checks less frequent
TEST_UTIL.getConfiguration().setInt( TEST_UTIL.getConfiguration().setInt(
"hbase.master.lease.thread.wakefrequency", 5 * 1000); "hbase.master.lease.thread.wakefrequency", 5 * 1000);
TEST_UTIL.getConfiguration().setInt( TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
"hbase.regionserver.lease.period", 10 * 1000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD,
10 * 1000);
// Increase the amount of time between client retries // Increase the amount of time between client retries
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000); TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
// This size should make it so we always split using the addContent // This size should make it so we always split using the addContent

View File

@ -3797,7 +3797,8 @@ public class TestHRegion extends HBaseTestCase {
// Make lease timeout longer, lease checks less frequent // Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
conf.setInt(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 10 * 1000); conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
conf.setInt(HConstants.HBASE_REGIONSERVER_ROWLOCK_TIMEOUT_PERIOD, 10 * 1000);
// Increase the amount of time between client retries // Increase the amount of time between client retries
conf.setLong("hbase.client.pause", 15 * 1000); conf.setLong("hbase.client.pause", 15 * 1000);