HBASE-8909. HBASE-6170 broke hbase.regionserver.lease.period config, we should support it for BC
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1503484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e84ad97903
commit
7fa275407f
|
@ -17,11 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -35,10 +40,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
|
||||
/**
|
||||
* Implements the scanner interface for the HBase client.
|
||||
* If there are multiple regions in a table, this scanner will iterate
|
||||
|
@ -65,7 +66,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
private final byte[] tableName;
|
||||
private final int scannerTimeout;
|
||||
private boolean scanMetricsPublished = false;
|
||||
|
||||
|
||||
/**
|
||||
* Create a new ClientScanner for the specified table. An HConnection will be
|
||||
* retrieved using the passed Configuration.
|
||||
|
@ -108,7 +109,9 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||
}
|
||||
this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
this.scannerTimeout = HBaseConfiguration.getInt(conf,
|
||||
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||
|
||||
// check if application wants to collect scan metrics
|
||||
|
@ -253,6 +256,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
scanMetricsPublished = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result next() throws IOException {
|
||||
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
|
||||
if (cache.size() == 0 && this.closed) {
|
||||
|
@ -377,6 +381,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
* if returned array is of zero-length (We never return null).
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public Result [] next(int nbRows) throws IOException {
|
||||
// Collect values to be returned here
|
||||
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
|
||||
|
@ -391,6 +396,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
return resultSets.toArray(new Result[resultSets.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!scanMetricsPublished) writeScanMetrics();
|
||||
if (callable != null) {
|
||||
|
|
|
@ -150,6 +150,35 @@ public class HBaseConfiguration extends Configuration {
|
|||
return isShowConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the value of the <code>name</code> property as an <code>int</code>, possibly
|
||||
* referring to the deprecated name of the configuration property.
|
||||
* If no such property exists, the provided default value is returned,
|
||||
* or if the specified value is not a valid <code>int</code>,
|
||||
* then an error is thrown.
|
||||
*
|
||||
* @param name property name.
|
||||
* @param deprecatedName a deprecatedName for the property to use
|
||||
* if non-deprecated name is not used
|
||||
* @param defaultValue default value.
|
||||
* @throws NumberFormatException when the value is invalid
|
||||
* @return property value as an <code>int</code>,
|
||||
* or <code>defaultValue</code>.
|
||||
*/
|
||||
// TODO: developer note: This duplicates the functionality of deprecated
|
||||
// property support in Configuration in Hadoop 2. But since Hadoop-1 does not
|
||||
// contain these changes, we will do our own as usual. Replace these when H2 is default.
|
||||
public static int getInt(Configuration conf, String name,
|
||||
String deprecatedName, int defaultValue) {
|
||||
if (conf.get(deprecatedName) != null) {
|
||||
LOG.warn(String.format("Config option \"%s\" is deprecated. Instead, use \"%s\""
|
||||
, deprecatedName, name));
|
||||
return conf.getInt(deprecatedName, defaultValue);
|
||||
} else {
|
||||
return conf.getInt(name, defaultValue);
|
||||
}
|
||||
}
|
||||
|
||||
/** For debugging. Dump configurations to system output as xml format.
|
||||
* Master and RS configurations can also be dumped using
|
||||
* http services. e.g. "curl http://master:60010/dump"
|
||||
|
|
|
@ -629,6 +629,14 @@ public final class HConstants {
|
|||
*/
|
||||
public static String HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = "hbase.client.scanner.timeout.period";
|
||||
|
||||
/**
|
||||
* Use {@link #HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD} instead.
|
||||
* @deprecated This config option is deprecated. Will be removed at later releases after 0.96.
|
||||
*/
|
||||
@Deprecated
|
||||
public static String HBASE_REGIONSERVER_LEASE_PERIOD_KEY =
|
||||
"hbase.regionserver.lease.period";
|
||||
|
||||
/**
|
||||
* Default value of {@link #HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD}.
|
||||
*/
|
||||
|
|
|
@ -109,11 +109,11 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
|||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
|
@ -171,9 +171,9 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
|
||||
|
@ -526,7 +526,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
this.abortRequested = false;
|
||||
this.stopped = false;
|
||||
|
||||
this.scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(conf,
|
||||
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||
|
||||
// Server to handle client requests.
|
||||
|
@ -569,6 +571,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
regionServerAccounting = new RegionServerAccounting();
|
||||
cacheConfig = new CacheConfig(conf);
|
||||
uncaughtExceptionHandler = new UncaughtExceptionHandler() {
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
abort("Uncaught exception in service thread " + t.getName(), e);
|
||||
}
|
||||
|
@ -772,6 +775,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
/**
|
||||
* The HRegionServer sticks in this loop until closed.
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// Do pre-registration initializations; zookeeper, lease threads, etc.
|
||||
|
@ -1216,6 +1220,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionServerAccounting getRegionServerAccounting() {
|
||||
return regionServerAccounting;
|
||||
}
|
||||
|
@ -1577,7 +1582,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
// quite a while inside HConnection layer. The worker won't be available for other
|
||||
// tasks even after current task is preempted after a split task times out.
|
||||
Configuration sinkConf = HBaseConfiguration.create(conf);
|
||||
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
|
||||
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
|
||||
|
@ -1740,6 +1745,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
* @param cause
|
||||
* the exception that caused the abort, or null
|
||||
*/
|
||||
@Override
|
||||
public void abort(String reason, Throwable cause) {
|
||||
String msg = "ABORTING region server " + this + ": " + reason;
|
||||
if (cause != null) {
|
||||
|
@ -1781,6 +1787,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
abort(reason, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return this.abortRequested;
|
||||
}
|
||||
|
@ -2024,6 +2031,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
/**
|
||||
* @return true if a stop has been requested.
|
||||
*/
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
return this.stopped;
|
||||
}
|
||||
|
@ -2033,6 +2041,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
return this.stopping;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, HRegion> getRecoveringRegions() {
|
||||
return this.recoveringRegions;
|
||||
}
|
||||
|
@ -2041,6 +2050,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
*
|
||||
* @return the configuration
|
||||
*/
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
|
@ -2081,6 +2091,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
// we'll sort the regions in reverse
|
||||
SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
|
||||
new Comparator<Long>() {
|
||||
@Override
|
||||
public int compare(Long a, Long b) {
|
||||
return -1 * a.compareTo(b);
|
||||
}
|
||||
|
@ -2100,6 +2111,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
}
|
||||
|
||||
/** @return reference to FlushRequester */
|
||||
@Override
|
||||
public FlushRequester getFlushRequester() {
|
||||
return this.cacheFlusher;
|
||||
}
|
||||
|
@ -2140,10 +2152,12 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
/**
|
||||
* @return Return the fs.
|
||||
*/
|
||||
@Override
|
||||
public FileSystem getFileSystem() {
|
||||
return fs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getServerName().toString();
|
||||
}
|
||||
|
@ -2184,10 +2198,12 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
|
||||
return this.regionsInTransitionInRS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService getExecutorService() {
|
||||
return service;
|
||||
}
|
||||
|
@ -2323,7 +2339,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
* @param tableName
|
||||
* @return Online regions from <code>tableName</code>
|
||||
*/
|
||||
public List<HRegion> getOnlineRegions(byte[] tableName) {
|
||||
@Override
|
||||
public List<HRegion> getOnlineRegions(byte[] tableName) {
|
||||
List<HRegion> tableRegions = new ArrayList<HRegion>();
|
||||
synchronized (this.onlineRegions) {
|
||||
for (HRegion region: this.onlineRegions.values()) {
|
||||
|
@ -2358,6 +2375,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
this.scannerName = n;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void leaseExpired() {
|
||||
RegionScannerHolder rsh = scanners.remove(this.scannerName);
|
||||
if (rsh != null) {
|
||||
|
@ -2637,6 +2655,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
*
|
||||
* @return True if we OOME'd and are aborting.
|
||||
*/
|
||||
@Override
|
||||
public boolean checkOOME(final Throwable e) {
|
||||
boolean stop = false;
|
||||
try {
|
||||
|
@ -4241,7 +4260,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
|
||||
ZKUtil.setData(zkw, nodePath,
|
||||
ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
|
||||
LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
|
||||
LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
|
||||
+ previousRSName);
|
||||
} else {
|
||||
LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
|
||||
|
|
Loading…
Reference in New Issue