HBASE-7590 Add a costless notifications mechanism from master to regionservers & clients

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1458184 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-03-19 09:24:01 +00:00
parent 955e0232ba
commit 17e604efbf
30 changed files with 420 additions and 125 deletions

View File

@ -80,6 +80,10 @@
<groupId>org.cloudera.htrace</groupId> <groupId>org.cloudera.htrace</groupId>
<artifactId>htrace</artifactId> <artifactId>htrace</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Sleeper;
@ -48,10 +49,22 @@ public abstract class Chore extends HasThread {
*/ */
public Chore(String name, final int p, final Stoppable stopper) { public Chore(String name, final int p, final Stoppable stopper) {
super(name); super(name);
if (stopper == null){
throw new NullPointerException("stopper cannot be null");
}
this.sleeper = new Sleeper(p, stopper); this.sleeper = new Sleeper(p, stopper);
this.stopper = stopper; this.stopper = stopper;
} }
/**
* This constructor is for test only. It allows to create an object and to call chore() on
* it. There is no sleeper nor stoppable.
*/
protected Chore(){
sleeper = null;
stopper = null;
}
/** /**
* @see java.lang.Thread#run() * @see java.lang.Thread#run()
*/ */
@ -60,7 +73,7 @@ public abstract class Chore extends HasThread {
try { try {
boolean initialChoreComplete = false; boolean initialChoreComplete = false;
while (!this.stopper.isStopped()) { while (!this.stopper.isStopped()) {
long startTime = System.currentTimeMillis(); long startTime = EnvironmentEdgeManager.currentTimeMillis();
try { try {
if (!initialChoreComplete) { if (!initialChoreComplete) {
initialChoreComplete = initialChore(); initialChoreComplete = initialChore();

View File

@ -34,13 +34,14 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.VersionedWritable; import org.apache.hadoop.io.VersionedWritable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map; import java.util.Map;
/** /**
* Status information on the HBase cluster. * Status information on the HBase cluster.
* <p> * <p>
@ -82,7 +83,7 @@ public class ClusterStatus extends VersionedWritable {
private Map<String, RegionState> intransition; private Map<String, RegionState> intransition;
private String clusterId; private String clusterId;
private String[] masterCoprocessors; private String[] masterCoprocessors;
private boolean balancerOn; private Boolean balancerOn;
/** /**
* Constructor, for Writable * Constructor, for Writable
@ -100,7 +101,7 @@ public class ClusterStatus extends VersionedWritable {
final Collection<ServerName> backupMasters, final Collection<ServerName> backupMasters,
final Map<String, RegionState> rit, final Map<String, RegionState> rit,
final String[] masterCoprocessors, final String[] masterCoprocessors,
final boolean balancerOn) { final Boolean balancerOn) {
this.hbaseVersion = hbaseVersion; this.hbaseVersion = hbaseVersion;
this.liveServers = servers; this.liveServers = servers;
@ -267,27 +268,39 @@ public class ClusterStatus extends VersionedWritable {
public boolean isBalancerOn() { public boolean isBalancerOn() {
return balancerOn != null && balancerOn;
}
public Boolean getBalancerOn(){
return balancerOn; return balancerOn;
} }
/** /**
* Convert a ClutserStatus to a protobuf ClusterStatus * Convert a ClusterStatus to a protobuf ClusterStatus
* *
* @return the protobuf ClusterStatus * @return the protobuf ClusterStatus
*/ */
public ClusterStatusProtos.ClusterStatus convert() { public ClusterStatusProtos.ClusterStatus convert() {
ClusterStatusProtos.ClusterStatus.Builder builder = ClusterStatusProtos.ClusterStatus.newBuilder(); ClusterStatusProtos.ClusterStatus.Builder builder =
ClusterStatusProtos.ClusterStatus.newBuilder();
builder.setHbaseVersion(HBaseVersionFileContent.newBuilder().setVersion(getHBaseVersion())); builder.setHbaseVersion(HBaseVersionFileContent.newBuilder().setVersion(getHBaseVersion()));
if (liveServers != null){
for (Map.Entry<ServerName, ServerLoad> entry : liveServers.entrySet()) { for (Map.Entry<ServerName, ServerLoad> entry : liveServers.entrySet()) {
LiveServerInfo.Builder lsi = LiveServerInfo.Builder lsi =
LiveServerInfo.newBuilder().setServer(ProtobufUtil.toServerName(entry.getKey())); LiveServerInfo.newBuilder().setServer(ProtobufUtil.toServerName(entry.getKey()));
lsi.setServerLoad(entry.getValue().obtainServerLoadPB()); lsi.setServerLoad(entry.getValue().obtainServerLoadPB());
builder.addLiveServers(lsi.build()); builder.addLiveServers(lsi.build());
} }
for (ServerName deadServer : getDeadServerNames()) { }
if (deadServers != null){
for (ServerName deadServer : deadServers) {
builder.addDeadServers(ProtobufUtil.toServerName(deadServer)); builder.addDeadServers(ProtobufUtil.toServerName(deadServer));
} }
}
if (intransition != null) {
for (Map.Entry<String, RegionState> rit : getRegionsInTransition().entrySet()) { for (Map.Entry<String, RegionState> rit : getRegionsInTransition().entrySet()) {
ClusterStatusProtos.RegionState rs = rit.getValue().convert(); ClusterStatusProtos.RegionState rs = rit.getValue().convert();
RegionSpecifier.Builder spec = RegionSpecifier.Builder spec =
@ -298,16 +311,32 @@ public class ClusterStatus extends VersionedWritable {
RegionInTransition.newBuilder().setSpec(spec.build()).setRegionState(rs).build(); RegionInTransition.newBuilder().setSpec(spec.build()).setRegionState(rs).build();
builder.addRegionsInTransition(pbRIT); builder.addRegionsInTransition(pbRIT);
} }
builder.setClusterId(new ClusterId(getClusterId()).convert()); }
for (String coprocessor : getMasterCoprocessors()) {
if (clusterId != null) {
builder.setClusterId(new ClusterId(clusterId).convert());
}
if (masterCoprocessors != null) {
for (String coprocessor : masterCoprocessors) {
builder.addMasterCoprocessors(HBaseProtos.Coprocessor.newBuilder().setName(coprocessor)); builder.addMasterCoprocessors(HBaseProtos.Coprocessor.newBuilder().setName(coprocessor));
} }
builder.setMaster( }
ProtobufUtil.toServerName(getMaster()));
for (ServerName backup : getBackupMasters()) { if (master != null){
builder.setMaster(ProtobufUtil.toServerName(getMaster()));
}
if (backupMasters != null) {
for (ServerName backup : backupMasters) {
builder.addBackupMasters(ProtobufUtil.toServerName(backup)); builder.addBackupMasters(ProtobufUtil.toServerName(backup));
} }
}
if (balancerOn != null){
builder.setBalancerOn(balancerOn); builder.setBalancerOn(balancerOn);
}
return builder.build(); return builder.build();
} }
@ -318,29 +347,51 @@ public class ClusterStatus extends VersionedWritable {
* @return the converted ClusterStatus * @return the converted ClusterStatus
*/ */
public static ClusterStatus convert(ClusterStatusProtos.ClusterStatus proto) { public static ClusterStatus convert(ClusterStatusProtos.ClusterStatus proto) {
Map<ServerName, ServerLoad> servers = new HashMap<ServerName, ServerLoad>();
Map<ServerName, ServerLoad> servers = null;
if (proto.getLiveServersList() != null) {
servers = new HashMap<ServerName, ServerLoad>(proto.getLiveServersList().size());
for (LiveServerInfo lsi : proto.getLiveServersList()) { for (LiveServerInfo lsi : proto.getLiveServersList()) {
servers.put(ProtobufUtil.toServerName(lsi.getServer()), new ServerLoad(lsi.getServerLoad())); servers.put(ProtobufUtil.toServerName(
lsi.getServer()), new ServerLoad(lsi.getServerLoad()));
} }
Collection<ServerName> deadServers = new LinkedList<ServerName>(); }
Collection<ServerName> deadServers = null;
if (proto.getDeadServersList() != null) {
deadServers = new ArrayList<ServerName>(proto.getDeadServersList().size());
for (HBaseProtos.ServerName sn : proto.getDeadServersList()) { for (HBaseProtos.ServerName sn : proto.getDeadServersList()) {
deadServers.add(ProtobufUtil.toServerName(sn)); deadServers.add(ProtobufUtil.toServerName(sn));
} }
Collection<ServerName> backupMasters = new LinkedList<ServerName>(); }
Collection<ServerName> backupMasters = null;
if (proto.getBackupMastersList() != null) {
backupMasters = new ArrayList<ServerName>(proto.getBackupMastersList().size());
for (HBaseProtos.ServerName sn : proto.getBackupMastersList()) { for (HBaseProtos.ServerName sn : proto.getBackupMastersList()) {
backupMasters.add(ProtobufUtil.toServerName(sn)); backupMasters.add(ProtobufUtil.toServerName(sn));
} }
final Map<String, RegionState> rit = new HashMap<String, RegionState>(); }
Map<String, RegionState> rit = null;
if (proto.getRegionsInTransitionList() != null) {
rit = new HashMap<String, RegionState>(proto.getRegionsInTransitionList().size());
for (RegionInTransition region : proto.getRegionsInTransitionList()) { for (RegionInTransition region : proto.getRegionsInTransitionList()) {
String key = new String(region.getSpec().getValue().toByteArray()); String key = new String(region.getSpec().getValue().toByteArray());
RegionState value = RegionState.convert(region.getRegionState()); RegionState value = RegionState.convert(region.getRegionState());
rit.put(key,value); rit.put(key, value);
} }
}
String[] masterCoprocessors = null;
if (proto.getMasterCoprocessorsList() != null) {
final int numMasterCoprocessors = proto.getMasterCoprocessorsCount(); final int numMasterCoprocessors = proto.getMasterCoprocessorsCount();
final String[] masterCoprocessors = new String[numMasterCoprocessors]; masterCoprocessors = new String[numMasterCoprocessors];
for (int i = 0; i < numMasterCoprocessors; i++) { for (int i = 0; i < numMasterCoprocessors; i++) {
masterCoprocessors[i] = proto.getMasterCoprocessors(i).getName(); masterCoprocessors[i] = proto.getMasterCoprocessors(i).getName();
} }
}
return new ClusterStatus(proto.getHbaseVersion().getVersion(), return new ClusterStatus(proto.getHbaseVersion().getVersion(),
ClusterId.convert(proto.getClusterId()).toString(),servers,deadServers, ClusterId.convert(proto.getClusterId()).toString(),servers,deadServers,
ProtobufUtil.toServerName(proto.getMaster()),backupMasters,rit,masterCoprocessors, ProtobufUtil.toServerName(proto.getMaster()),backupMasters,rit,masterCoprocessors,

View File

@ -155,7 +155,7 @@ public class HBaseAdmin implements Abortable, Closeable {
* @param c Configuration object. Copied internally. * @param c Configuration object. Copied internally.
*/ */
public HBaseAdmin(Configuration c) public HBaseAdmin(Configuration c)
throws MasterNotRunningException, ZooKeeperConnectionException { throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
// Will not leak connections, as the new implementation of the constructor // Will not leak connections, as the new implementation of the constructor
// does not throw exceptions anymore. // does not throw exceptions anymore.
this(HConnectionManager.getConnection(new Configuration(c))); this(HConnectionManager.getConnection(new Configuration(c)));
@ -554,8 +554,6 @@ public class HBaseAdmin implements Abortable, Closeable {
}); });
// Wait until all regions deleted // Wait until all regions deleted
ClientProtocol server =
connection.getClient(firstMetaServer.getServerName());
for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) { for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
try { try {
@ -565,6 +563,7 @@ public class HBaseAdmin implements Abortable, Closeable {
firstMetaServer.getRegionInfo().getRegionName(), scan, 1, true); firstMetaServer.getRegionInfo().getRegionName(), scan, 1, true);
Result[] values = null; Result[] values = null;
// Get a batch at a time. // Get a batch at a time.
ClientProtocol server = connection.getClient(firstMetaServer.getServerName());
try { try {
ScanResponse response = server.scan(null, request); ScanResponse response = server.scan(null, request);
values = ResponseConverter.getResults(response); values = ResponseConverter.getResults(response);
@ -1934,7 +1933,7 @@ public class HBaseAdmin implements Abortable, Closeable {
* @throws ZooKeeperConnectionException if unable to connect to zookeeper * @throws ZooKeeperConnectionException if unable to connect to zookeeper
*/ */
public static void checkHBaseAvailable(Configuration conf) public static void checkHBaseAvailable(Configuration conf)
throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException { throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
Configuration copyOfConf = HBaseConfiguration.create(conf); Configuration copyOfConf = HBaseConfiguration.create(conf);
// We set it to make it fail as soon as possible if HBase is not available // We set it to make it fail as soon as possible if HBase is not available
@ -2435,7 +2434,7 @@ public class HBaseAdmin implements Abortable, Closeable {
* Execute Restore/Clone snapshot and wait for the server to complete (blocking). * Execute Restore/Clone snapshot and wait for the server to complete (blocking).
* To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to * To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to
* create an HTable instance to this table before it is available. * create an HTable instance to this table before it is available.
* @param snapshot snapshot to restore * @param snapshotName snapshot to restore
* @param tableName table name to restore the snapshot on * @param tableName table name to restore the snapshot on
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
* @throws RestoreSnapshotException if snapshot failed to be restored * @throws RestoreSnapshotException if snapshot failed to be restored

View File

@ -403,11 +403,12 @@ public interface HConnection extends Abortable, Closeable {
*/ */
public boolean isClosed(); public boolean isClosed();
/** /**
* Clear any caches that pertain to server name <code>sn</code> * Clear any caches that pertain to server name <code>sn</code>
* @param sn A server name as hostname:port * @param sn A server name
*/ */
public void clearCaches(final String sn); public void clearCaches(final ServerName sn);
/** /**
* This function allows HBaseAdminProtocol and potentially others to get a shared MasterMonitor * This function allows HBaseAdminProtocol and potentially others to get a shared MasterMonitor
@ -425,5 +426,11 @@ public interface HConnection extends Abortable, Closeable {
* @throws MasterNotRunningException * @throws MasterNotRunningException
*/ */
public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin() throws MasterNotRunningException; public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin() throws MasterNotRunningException;
/**
* @param serverName
* @return true if the server is known as dead, false otherwise.
*/
public boolean isDeadServer(ServerName serverName);
} }

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
@ -202,7 +203,7 @@ public class HConnectionManager {
* @throws ZooKeeperConnectionException * @throws ZooKeeperConnectionException
*/ */
public static HConnection getConnection(Configuration conf) public static HConnection getConnection(Configuration conf)
throws ZooKeeperConnectionException { throws IOException {
HConnectionKey connectionKey = new HConnectionKey(conf); HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (HBASE_INSTANCES) { synchronized (HBASE_INSTANCES) {
HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
@ -230,7 +231,7 @@ public class HConnectionManager {
* @throws ZooKeeperConnectionException * @throws ZooKeeperConnectionException
*/ */
public static HConnection createConnection(Configuration conf) public static HConnection createConnection(Configuration conf)
throws ZooKeeperConnectionException { throws IOException {
return new HConnectionImplementation(conf, false); return new HConnectionImplementation(conf, false);
} }
@ -260,7 +261,6 @@ public class HConnectionManager {
/** /**
* Delete information for all connections. * Delete information for all connections.
* @throws IOException
*/ */
public static void deleteAllConnections() { public static void deleteAllConnections() {
synchronized (HBASE_INSTANCES) { synchronized (HBASE_INSTANCES) {
@ -520,6 +520,9 @@ public class HConnectionManager {
private volatile boolean closed; private volatile boolean closed;
private volatile boolean aborted; private volatile boolean aborted;
// package protected for the tests
ClusterStatusListener clusterStatusListener;
private final Object metaRegionLock = new Object(); private final Object metaRegionLock = new Object();
private final Object userRegionLock = new Object(); private final Object userRegionLock = new Object();
@ -558,8 +561,8 @@ public class HConnectionManager {
// entry in cachedRegionLocations that map to this server; but the absence // entry in cachedRegionLocations that map to this server; but the absence
// of a server in this map guarentees that there is no entry in cache that // of a server in this map guarentees that there is no entry in cache that
// maps to the absent server. // maps to the absent server.
private final Set<String> cachedServers = // The access to this attribute must be protected by a lock on cachedRegionLocations
new HashSet<String>(); private final Set<ServerName> cachedServers = new HashSet<ServerName>();
// region cache prefetch is enabled by default. this set contains all // region cache prefetch is enabled by default. this set contains all
// tables whose region cache prefetch are disabled. // tables whose region cache prefetch are disabled.
@ -575,8 +578,7 @@ public class HConnectionManager {
* @param conf Configuration object * @param conf Configuration object
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public HConnectionImplementation(Configuration conf, boolean managed) public HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
throws ZooKeeperConnectionException {
this.conf = conf; this.conf = conf;
this.managed = managed; this.managed = managed;
String adminClassName = conf.get(REGION_PROTOCOL_CLASS, String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
@ -613,10 +615,29 @@ public class HConnectionManager {
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
retrieveClusterId(); retrieveClusterId();
// ProtobufRpcClientEngine is the main RpcClientEngine implementation, // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
// but we maintain access through an interface to allow overriding for tests // but we maintain access through an interface to allow overriding for tests
// RPC engine setup must follow obtaining the cluster ID for token authentication to work // RPC engine setup must follow obtaining the cluster ID for token authentication to work
this.rpcEngine = new ProtobufRpcClientEngine(this.conf, this.clusterId); this.rpcEngine = new ProtobufRpcClientEngine(this.conf, this.clusterId);
// Do we publish the status?
Class<? extends ClusterStatusListener.Listener> listenerClass =
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
if (listenerClass != null) {
clusterStatusListener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
rpcEngine.getClient().cancelConnections(sn.getHostname(), sn.getPort(), null);
}
}, conf, listenerClass);
}
} }
/** /**
@ -755,7 +776,7 @@ public class HConnectionManager {
// tries at this point is 1 or more; decrement to start from 0. // tries at this point is 1 or more; decrement to start from 0.
long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1); long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
LOG.info("getMaster attempt " + tries + " of " + numRetries + LOG.info("getMaster attempt " + tries + " of " + numRetries +
" failed; retrying after sleep of " +pauseTime, exceptionCaught); " failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught);
try { try {
Thread.sleep(pauseTime); Thread.sleep(pauseTime);
@ -922,12 +943,22 @@ public class HConnectionManager {
} }
} }
@Override @Override
public HRegionLocation locateRegion(final byte[] regionName) throws IOException { public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
return locateRegion(HRegionInfo.getTableName(regionName), return locateRegion(HRegionInfo.getTableName(regionName),
HRegionInfo.getStartKey(regionName), false, true); HRegionInfo.getStartKey(regionName), false, true);
} }
@Override
public boolean isDeadServer(ServerName sn) {
if (clusterStatusListener == null) {
return false;
} else {
return clusterStatusListener.isDeadServer(sn);
}
}
@Override @Override
public List<HRegionLocation> locateRegions(final byte[] tableName) public List<HRegionLocation> locateRegions(final byte[] tableName)
throws IOException { throws IOException {
@ -1087,10 +1118,9 @@ public class HConnectionManager {
metaLocation = locateRegion(parentTable, metaKey, true, false); metaLocation = locateRegion(parentTable, metaKey, true, false);
// If null still, go around again. // If null still, go around again.
if (metaLocation == null) continue; if (metaLocation == null) continue;
ClientProtocol server = ClientProtocol server = getClient(metaLocation.getServerName());
getClient(metaLocation.getServerName());
Result regionInfoRow = null; Result regionInfoRow;
// This block guards against two threads trying to load the meta // This block guards against two threads trying to load the meta
// region at the same time. The first will load the meta region and // region at the same time. The first will load the meta region and
// the second will use the value that the first one found. // the second will use the value that the first one found.
@ -1157,6 +1187,12 @@ public class HConnectionManager {
Bytes.toStringBinary(row)); Bytes.toStringBinary(row));
} }
if (isDeadServer(serverName)){
throw new RegionServerStoppedException(".META. says the region "+
regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
", but it is dead.");
}
// Instantiate the location // Instantiate the location
location = new HRegionLocation(regionInfo, serverName, location = new HRegionLocation(regionInfo, serverName,
HRegionInfo.getSeqNumDuringOpen(regionInfoRow)); HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
@ -1269,41 +1305,33 @@ public class HConnectionManager {
if ((rl != null) && LOG.isDebugEnabled()) { if ((rl != null) && LOG.isDebugEnabled()) {
LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort() LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
+ " as a location of " + rl.getRegionInfo().getRegionNameAsString() + + " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
" for tableName=" + Bytes.toString(tableName) + " for tableName=" + Bytes.toString(tableName) + " from cache");
" from cache to make sure we don't use cache for " + Bytes.toStringBinary(row));
} }
} }
@Override
public void clearCaches(String sn) {
clearCachedLocationForServer(sn);
}
/* /*
* Delete all cached entries of a table that maps to a specific location. * Delete all cached entries of a table that maps to a specific location.
*
* @param tablename
* @param server
*/ */
private void clearCachedLocationForServer(final String server) { @Override
public void clearCaches(final ServerName serverName){
boolean deletedSomething = false; boolean deletedSomething = false;
synchronized (this.cachedRegionLocations) { synchronized (this.cachedRegionLocations) {
if (!cachedServers.contains(server)) { if (!cachedServers.contains(serverName)) {
return; return;
} }
for (Map<byte[], HRegionLocation> tableLocations : for (Map<byte[], HRegionLocation> tableLocations :
cachedRegionLocations.values()) { cachedRegionLocations.values()) {
for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) { for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
if (e.getValue().getHostnamePort().equals(server)) { if (serverName.equals(e.getValue().getServerName())) {
tableLocations.remove(e.getKey()); tableLocations.remove(e.getKey());
deletedSomething = true; deletedSomething = true;
} }
} }
} }
cachedServers.remove(server); cachedServers.remove(serverName);
} }
if (deletedSomething && LOG.isDebugEnabled()) { if (deletedSomething && LOG.isDebugEnabled()) {
LOG.debug("Removed all cached region locations that map to " + server); LOG.debug("Removed all cached region locations that map to " + serverName);
} }
} }
@ -1359,7 +1387,7 @@ public class HConnectionManager {
boolean isStaleUpdate = false; boolean isStaleUpdate = false;
HRegionLocation oldLocation = null; HRegionLocation oldLocation = null;
synchronized (this.cachedRegionLocations) { synchronized (this.cachedRegionLocations) {
cachedServers.add(location.getHostnamePort()); cachedServers.add(location.getServerName());
oldLocation = tableLocations.get(startKey); oldLocation = tableLocations.get(startKey);
isNewCacheEntry = (oldLocation == null); isNewCacheEntry = (oldLocation == null);
// If the server in cache sends us a redirect, assume it's always valid. // If the server in cache sends us a redirect, assume it's always valid.
@ -1414,6 +1442,9 @@ public class HConnectionManager {
@Override @Override
public ClientProtocol getClient(final ServerName serverName) public ClientProtocol getClient(final ServerName serverName)
throws IOException { throws IOException {
if (isDeadServer(serverName)){
throw new RegionServerStoppedException("The server " + serverName + " is dead.");
}
return (ClientProtocol) return (ClientProtocol)
getProtocol(serverName.getHostname(), serverName.getPort(), clientClass); getProtocol(serverName.getHostname(), serverName.getPort(), clientClass);
} }
@ -1429,6 +1460,9 @@ public class HConnectionManager {
@Override @Override
public AdminProtocol getAdmin(final ServerName serverName, final boolean master) public AdminProtocol getAdmin(final ServerName serverName, final boolean master)
throws IOException { throws IOException {
if (isDeadServer(serverName)){
throw new RegionServerStoppedException("The server " + serverName + " is dead.");
}
return (AdminProtocol)getProtocol( return (AdminProtocol)getProtocol(
serverName.getHostname(), serverName.getPort(), adminClass); serverName.getHostname(), serverName.getPort(), adminClass);
} }
@ -1997,7 +2031,7 @@ public class HConnectionManager {
if (LOG.isTraceEnabled() && (sleepTime > 0)) { if (LOG.isTraceEnabled() && (sleepTime > 0)) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (Action<R> action : e.getValue().allActions()) { for (Action<R> action : e.getValue().allActions()) {
sb.append(Bytes.toStringBinary(action.getAction().getRow()) + ";"); sb.append(Bytes.toStringBinary(action.getAction().getRow())).append(';');
} }
LOG.trace("Sending requests to [" + e.getKey().getHostnamePort() LOG.trace("Sending requests to [" + e.getKey().getHostnamePort()
+ "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]"); + "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]");
@ -2391,6 +2425,9 @@ public class HConnectionManager {
closeZooKeeperWatcher(); closeZooKeeperWatcher();
this.servers.clear(); this.servers.clear();
this.rpcEngine.close(); this.rpcEngine.close();
if (clusterStatusListener != null) {
clusterStatusListener.close();
}
this.closed = true; this.closed = true;
} }

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -53,6 +55,8 @@ import java.util.concurrent.Callable;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public abstract class ServerCallable<T> implements Callable<T> { public abstract class ServerCallable<T> implements Callable<T> {
static final Log LOG = LogFactory.getLog(ServerCallable.class);
protected final HConnection connection; protected final HConnection connection;
protected final byte [] tableName; protected final byte [] tableName;
protected final byte [] row; protected final byte [] row;
@ -62,6 +66,7 @@ public abstract class ServerCallable<T> implements Callable<T> {
protected long globalStartTime; protected long globalStartTime;
protected long startTime, endTime; protected long startTime, endTime;
protected final static int MIN_RPC_TIMEOUT = 2000; protected final static int MIN_RPC_TIMEOUT = 2000;
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
/** /**
* @param connection Connection to use. * @param connection Connection to use.
@ -154,32 +159,46 @@ public abstract class ServerCallable<T> implements Callable<T> {
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>(); new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
for (int tries = 0; tries < numRetries; tries++) { for (int tries = 0;; tries++) {
long expectedSleep = 0;
try { try {
beforeCall(); beforeCall();
connect(tries != 0); connect(tries != 0); // if called with false, check table status on ZK
return call(); return call();
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Received exception, tries=" + tries + ", numRetries=" + numRetries +
" message=" + t.getMessage());
t = translateException(t); t = translateException(t);
// translateException throws an exception when we should not retry, i.e. when it's the
// request that is bad.
if (t instanceof SocketTimeoutException || if (t instanceof SocketTimeoutException ||
t instanceof ConnectException || t instanceof ConnectException ||
t instanceof RetriesExhaustedException) { t instanceof RetriesExhaustedException ||
getConnection().isDeadServer(location.getServerName())) {
// if thrown these exceptions, we clear all the cache entries that // if thrown these exceptions, we clear all the cache entries that
// map to that slow/dead server; otherwise, let cache miss and ask // map to that slow/dead server; otherwise, let cache miss and ask
// .META. again to find the new location // .META. again to find the new location
HRegionLocation hrl = location; getConnection().clearCaches(location.getServerName());
if (hrl != null) {
getConnection().clearCaches(hrl.getHostnamePort());
}
} }
RetriesExhaustedException.ThrowableWithExtraContext qt = RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t, new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString()); EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt); exceptions.add(qt);
if (tries == numRetries - 1) { if (tries >= numRetries - 1) {
throw new RetriesExhaustedException(tries, exceptions); throw new RetriesExhaustedException(tries, exceptions);
} }
long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
// If the server is dead, we need to wait a little before retrying, to give
// a chance to the regions to be
expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
if (expectedSleep < MIN_WAIT_DEAD_SERVER &&
getConnection().isDeadServer(location.getServerName())){
expectedSleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
}
// If, after the planned sleep, there won't be enough time left, we stop now. // If, after the planned sleep, there won't be enough time left, we stop now.
if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) > if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) >
this.callTimeout) { this.callTimeout) {
@ -193,13 +212,12 @@ public abstract class ServerCallable<T> implements Callable<T> {
afterCall(); afterCall();
} }
try { try {
Thread.sleep(ConnectionUtils.getPauseTime(pause, tries)); Thread.sleep(expectedSleep);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IOException("Interrupted after tries=" + tries, e); throw new IOException("Interrupted after " + tries + " tries on " + numRetries, e);
} }
} }
return null;
} }
/** /**
@ -210,6 +228,7 @@ public abstract class ServerCallable<T> implements Callable<T> {
*/ */
public T withoutRetries() public T withoutRetries()
throws IOException, RuntimeException { throws IOException, RuntimeException {
// The code of this method should be shared with withRetries.
this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
try { try {
beforeCall(); beforeCall();
@ -217,6 +236,7 @@ public abstract class ServerCallable<T> implements Callable<T> {
return call(); return call();
} catch (Throwable t) { } catch (Throwable t) {
Throwable t2 = translateException(t); Throwable t2 = translateException(t);
// It would be nice to clear the location cache here.
if (t2 instanceof IOException) { if (t2 instanceof IOException) {
throw (IOException)t2; throw (IOException)t2;
} else { } else {
@ -227,7 +247,13 @@ public abstract class ServerCallable<T> implements Callable<T> {
} }
} }
protected static Throwable translateException(Throwable t) throws IOException { /**
* Get the good or the remote exception if any, throws the DoNotRetryIOException.
* @param t the throwable to analyze
* @return the translated exception, if it's not a DoNotRetryIOException
* @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
*/
protected static Throwable translateException(Throwable t) throws DoNotRetryIOException {
if (t instanceof UndeclaredThrowableException) { if (t instanceof UndeclaredThrowableException) {
t = t.getCause(); t = t.getCause();
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
@ -184,7 +185,6 @@ public class HBaseClient {
return false; return false;
} }
} }
public static class FailedServerException extends IOException { public static class FailedServerException extends IOException {
@ -1340,6 +1340,30 @@ public class HBaseClient {
} }
} }
/**
* Interrupt the connections to the given ip:port server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,
* depending on their own behavior, they may retry on the same server. This can be a feature,
* for example at startup. In any case, they're likely to get connection refused (if the
* process died) or no route to host: i.e. there next retries should be faster and with a
* safe exception.
*/
public void cancelConnections(String hostname, int port, IOException ioe) {
synchronized (connections) {
for (Connection connection : connections.values()) {
if (connection.isAlive() &&
connection.getRemoteAddress().getPort() == port &&
connection.getRemoteAddress().getHostName().equals(hostname)) {
LOG.info("The server on " + hostname + ":" + port +
" is dead - stopping the connection " + connection.remoteId);
connection.closeConnection();
// We could do a connection.interrupt(), but it's safer not to do it, as the
// interrupted exception behavior is not defined nor enforced enough.
}
}
}
}
/** Makes a set of calls in parallel. Each parameter is sent to the /** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out * corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array * or errored, the collected results are returned in an array. The array

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -108,7 +109,8 @@ public class HBaseClientRPC {
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// IGNORE Thread.interrupted();
throw new InterruptedIOException();
} }
} }
} }

View File

@ -42,12 +42,17 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine"); LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine");
public HBaseClient getClient() {
return client;
}
protected HBaseClient client; protected HBaseClient client;
public ProtobufRpcClientEngine(Configuration conf, String clusterId) { public ProtobufRpcClientEngine(Configuration conf, String clusterId) {
this.client = new HBaseClient(conf, clusterId); this.client = new HBaseClient(conf, clusterId);
} }
@Override @Override
public <T extends IpcProtocol> T getProxy( public <T extends IpcProtocol> T getProxy(
Class<T> protocol, InetSocketAddress addr, Class<T> protocol, InetSocketAddress addr,

View File

@ -35,4 +35,6 @@ public interface RpcClientEngine {
/** Shutdown this instance */ /** Shutdown this instance */
void close(); void close();
public HBaseClient getClient();
} }

View File

@ -233,7 +233,7 @@ public class RecoverableZooKeeper {
private void retryOrThrow(RetryCounter retryCounter, KeeperException e, private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
String opName) throws KeeperException { String opName) throws KeeperException {
LOG.warn("Possibly transient ZooKeeper exception: " + e); LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
if (!retryCounter.shouldRetry()) { if (!retryCounter.shouldRetry()) {
LOG.error("ZooKeeper " + opName + " failed after " LOG.error("ZooKeeper " + opName + " failed after "
+ retryCounter.getMaxRetries() + " retries"); + retryCounter.getMaxRetries() + " retries");

View File

@ -808,6 +808,23 @@ public final class HConstants {
"hbase.node.health.failure.threshold"; "hbase.node.health.failure.threshold";
public static final int DEFAULT_HEALTH_FAILURE_THRESHOLD = 3; public static final int DEFAULT_HEALTH_FAILURE_THRESHOLD = 3;
/**
* IP to use for the multicast status messages between the master and the clients.
* The default address is chosen as one among others within the ones suitable for multicast
* messages.
*/
public static final String STATUS_MULTICAST_ADDRESS = "hbase.status.multicast.address.ip";
public static final String DEFAULT_STATUS_MULTICAST_ADDRESS = "226.1.1.3";
/**
* The port to use for the multicast messages.
*/
public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.port";
public static final int DEFAULT_STATUS_MULTICAST_PORT = 60100;
private HConstants() { private HConstants() {
// Can't be instantiated with this ctor. // Can't be instantiated with this ctor.
} }

View File

@ -1898,7 +1898,8 @@ public class AssignmentManager extends ZooKeeperListener {
if (existingPlan != null && existingPlan.getDestination() != null) { if (existingPlan != null && existingPlan.getDestination() != null) {
LOG.debug("Found an existing plan for " + region.getRegionNameAsString() LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
+ " destination server is " + existingPlan.getDestination()); + " destination server is " + existingPlan.getDestination() +
" accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
} }
if (forceNewPlan if (forceNewPlan
@ -1918,7 +1919,8 @@ public class AssignmentManager extends ZooKeeperListener {
" so generated a random one; " + randomPlan + "; " + " so generated a random one; " + randomPlan + "; " +
serverManager.countOfRegionServers() + serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() + " (online=" + serverManager.getOnlineServers().size() +
", available=" + destServers.size() + ") available servers"); ", available=" + destServers.size() + ") available servers" +
", forceNewPlan=" + forceNewPlan);
return randomPlan; return randomPlan;
} }
LOG.debug("Using pre-existing plan for region " + LOG.debug("Using pre-existing plan for region " +

View File

@ -74,8 +74,8 @@ public class DeadServer {
} }
/** /**
* @param serverName * @param serverName server name.
* @return true if this server is on the dead servers list. * @return true if this server is on the dead servers list false otherwise
*/ */
public synchronized boolean isDeadServer(final ServerName serverName) { public synchronized boolean isDeadServer(final ServerName serverName) {
return deadServers.containsKey(serverName); return deadServers.containsKey(serverName);

View File

@ -305,6 +305,7 @@ Server {
private LoadBalancer balancer; private LoadBalancer balancer;
private Thread balancerChore; private Thread balancerChore;
private Thread clusterStatusChore; private Thread clusterStatusChore;
private ClusterStatusPublisher clusterStatusPublisherChore = null;
private CatalogJanitor catalogJanitorChore; private CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner; private LogCleaner logCleaner;
@ -429,12 +430,23 @@ Server {
if (isHealthCheckerConfigured()) { if (isHealthCheckerConfigured()) {
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
} }
// Do we publish the status?
Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.Publisher.class);
if (publisherClass != null) {
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
}
} }
/** /**
* Stall startup if we are designated a backup master; i.e. we want someone * Stall startup if we are designated a backup master; i.e. we want someone
* else to become the master before proceeding. * else to become the master before proceeding.
* @param c * @param c configuration
* @param amm * @param amm
* @throws InterruptedException * @throws InterruptedException
*/ */
@ -1080,6 +1092,9 @@ Server {
if (this.catalogJanitorChore != null) { if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt(); this.catalogJanitorChore.interrupt();
} }
if (this.clusterStatusPublisherChore != null){
clusterStatusPublisherChore.interrupt();
}
} }
@Override @Override
@ -2539,7 +2554,7 @@ Server {
* No exceptions are thrown if the restore is not running, the result will be "done". * No exceptions are thrown if the restore is not running, the result will be "done".
* *
* @return done <tt>true</tt> if the restore/clone operation is completed. * @return done <tt>true</tt> if the restore/clone operation is completed.
* @throws RestoreSnapshotExcepton if the operation failed. * @throws ServiceException if the operation failed.
*/ */
@Override @Override
public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller, public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,

View File

@ -183,6 +183,9 @@ public class HMasterCommandLine extends ServerCommandLine {
} catch (ZooKeeperConnectionException e) { } catch (ZooKeeperConnectionException e) {
LOG.error("ZooKeeper not available"); LOG.error("ZooKeeper not available");
return -1; return -1;
} catch (IOException e) {
LOG.error("Got IOException: " +e.getMessage(), e);
return -1;
} }
try { try {
adm.shutdown(); adm.shutdown();

View File

@ -177,12 +177,12 @@ public class ServerManager {
* @throws ZooKeeperConnectionException * @throws ZooKeeperConnectionException
*/ */
public ServerManager(final Server master, final MasterServices services) public ServerManager(final Server master, final MasterServices services)
throws ZooKeeperConnectionException { throws IOException {
this(master, services, true); this(master, services, true);
} }
ServerManager(final Server master, final MasterServices services, ServerManager(final Server master, final MasterServices services,
final boolean connect) throws ZooKeeperConnectionException { final boolean connect) throws IOException {
this.master = master; this.master = master;
this.services = services; this.services = services;
Configuration c = master.getConfiguration(); Configuration c = master.getConfiguration();

View File

@ -89,8 +89,7 @@ class MemStoreFlusher implements FlushRequester {
private long blockingWaitTime; private long blockingWaitTime;
private final Counter updatesBlockedMsHighWater = new Counter(); private final Counter updatesBlockedMsHighWater = new Counter();
private FlushHandler[] flushHandlers = null; private final FlushHandler[] flushHandlers;
private int handlerCount;
/** /**
* @param conf * @param conf
@ -116,7 +115,8 @@ class MemStoreFlusher implements FlushRequester {
conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000); 90000);
this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
this.flushHandlers = new FlushHandler[handlerCount];
LOG.info("globalMemStoreLimit=" + LOG.info("globalMemStoreLimit=" +
StringUtils.humanReadableInt(this.globalMemStoreLimit) + StringUtils.humanReadableInt(this.globalMemStoreLimit) +
", globalMemStoreLimitLowMark=" + ", globalMemStoreLimitLowMark=" +
@ -350,7 +350,6 @@ class MemStoreFlusher implements FlushRequester {
synchronized void start(UncaughtExceptionHandler eh) { synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
server.getServerName().toString() + "-MemStoreFlusher", eh); server.getServerName().toString() + "-MemStoreFlusher", eh);
flushHandlers = new FlushHandler[handlerCount];
for (int i = 0; i < flushHandlers.length; i++) { for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler(); flushHandlers[i] = new FlushHandler();
flusherThreadFactory.newThread(flushHandlers[i]); flusherThreadFactory.newThread(flushHandlers[i]);
@ -607,7 +606,7 @@ class MemStoreFlusher implements FlushRequester {
} }
/** /**
* @return Count of times {@link #resetDelay()} was called; i.e this is * @return Count of times {@link #requeue(long)} was called; i.e this is
* number of times we've been requeued. * number of times we've been requeued.
*/ */
public int getRequeueCount() { public int getRequeueCount() {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler; import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
import org.apache.hadoop.hbase.thrift.generated.TIncrement; import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.thrift.TException; import org.apache.thrift.TException;
@ -168,7 +169,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
pool = pool =
new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
new DaemonThreadFactory()); Threads.newDaemonThreadFactory("IncrementCoalescer"));
MBeanUtil.registerMBean("thrift", "Thrift", this); MBeanUtil.registerMBean("thrift", "Thrift", this);
} }

View File

@ -277,4 +277,15 @@ public abstract class HBaseCluster implements Closeable, Configurable {
*/ */
@Override @Override
public abstract void close() throws IOException; public abstract void close() throws IOException;
/**
* Wait for the namenode.
*
* @throws InterruptedException
*/
public void waitForNamenodeAvailable() throws InterruptedException {
}
public void waitForDatanodesRegistered(int nbDN) throws Exception {
}
} }

View File

@ -2354,6 +2354,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
private static final int MIN_RANDOM_PORT = 0xc000; private static final int MIN_RANDOM_PORT = 0xc000;
private static final int MAX_RANDOM_PORT = 0xfffe; private static final int MAX_RANDOM_PORT = 0xfffe;
private static Random random = new Random();
/** /**
* Returns a random port. These ports cannot be registered with IANA and are * Returns a random port. These ports cannot be registered with IANA and are
@ -2361,7 +2362,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/ */
public static int randomPort() { public static int randomPort() {
return MIN_RANDOM_PORT return MIN_RANDOM_PORT
+ new Random().nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT); + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
} }
/** /**
@ -2387,6 +2388,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return port; return port;
} }
public static String randomMultiCastAddress() {
return "226.1.1." + random.nextInt(254);
}
public static void waitForHostPort(String host, int port) public static void waitForHostPort(String host, int port)
throws IOException { throws IOException {
final int maxTimeMs = 10000; final int maxTimeMs = 10000;

View File

@ -74,7 +74,7 @@ public class TestMultiVersions {
@Before @Before
public void before() public void before()
throws MasterNotRunningException, ZooKeeperConnectionException { throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
this.admin = new HBaseAdmin(UTIL.getConfiguration()); this.admin = new HBaseAdmin(UTIL.getConfiguration());
} }

View File

@ -131,7 +131,7 @@ public class HConnectionTestingUtility {
* {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)} * {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
*/ */
public static HConnection getSpiedConnection(final Configuration conf) public static HConnection getSpiedConnection(final Configuration conf)
throws ZooKeeperConnectionException { throws IOException {
HConnectionKey connectionKey = new HConnectionKey(conf); HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (HConnectionManager.HBASE_INSTANCES) { synchronized (HConnectionManager.HBASE_INSTANCES) {
HConnectionImplementation connection = HConnectionImplementation connection =

View File

@ -1592,6 +1592,7 @@ public class TestAdmin {
} catch (MasterNotRunningException ignored) { } catch (MasterNotRunningException ignored) {
} catch (ZooKeeperConnectionException ignored) { } catch (ZooKeeperConnectionException ignored) {
} catch (ServiceException ignored) { } catch (ServiceException ignored) {
} catch (IOException ignored) {
} }
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.ArrayList; import java.util.ArrayList;
@ -44,13 +45,17 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.master.ClusterStatusPublisher;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -77,6 +82,10 @@ public class TestHCM {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.MulticastPublisher.class, ClusterStatusPublisher.Publisher.class);
TEST_UTIL.getConfiguration().setClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.MultiCastListener.class, ClusterStatusListener.Listener.class);
TEST_UTIL.startMiniCluster(2); TEST_UTIL.startMiniCluster(2);
} }
@ -88,7 +97,7 @@ public class TestHCM {
public static void createNewConfigurations() throws SecurityException, public static void createNewConfigurations() throws SecurityException,
IllegalArgumentException, NoSuchFieldException, IllegalArgumentException, NoSuchFieldException,
IllegalAccessException, InterruptedException, ZooKeeperConnectionException { IllegalAccessException, InterruptedException, ZooKeeperConnectionException, IOException {
HConnection last = null; HConnection last = null;
for (int i = 0; i <= (HConnectionManager.MAX_CACHED_HBASE_INSTANCES * 2); i++) { for (int i = 0; i <= (HConnectionManager.MAX_CACHED_HBASE_INSTANCES * 2); i++) {
// set random key to differentiate the connection from previous ones // set random key to differentiate the connection from previous ones
@ -117,6 +126,61 @@ public class TestHCM {
return HConnectionTestingUtility.getConnectionCount(); return HConnectionTestingUtility.getConnectionCount();
} }
@Test(expected = RegionServerStoppedException.class)
public void testClusterStatus() throws Exception {
byte[] tn = "testClusterStatus".getBytes();
byte[] cf = "cf".getBytes();
byte[] rk = "rk1".getBytes();
JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
rs.waitForServerOnline();
final ServerName sn = rs.getRegionServer().getServerName();
HTable t = TEST_UTIL.createTable(tn, cf);
TEST_UTIL.waitTableAvailable(tn);
while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
getRegionStates().isRegionsInTransition()){
Thread.sleep(1);
}
final HConnectionImplementation hci = (HConnectionImplementation)t.getConnection();
while (t.getRegionLocation(rk).getPort() != sn.getPort()){
TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
getEncodedNameAsBytes(), sn.getVersionedBytes());
while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
getRegionStates().isRegionsInTransition()){
Thread.sleep(1);
}
hci.clearRegionCache(tn);
}
Assert.assertNotNull(hci.clusterStatusListener);
TEST_UTIL.assertRegionOnServer(t.getRegionLocation(rk).getRegionInfo(), sn, 20000);
Put p1 = new Put(rk);
p1.add(cf, "qual".getBytes(), "val".getBytes());
t.put(p1);
rs.getRegionServer().abort("I'm dead");
// We want the status to be updated. That's a least 10 second
TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
getDeadServers().isDeadServer(sn);
}
});
TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return hci.clusterStatusListener.isDeadServer(sn);
}
});
hci.getClient(sn); // will throw an exception: RegionServerStoppedException
}
@Test @Test
public void abortingHConnectionRemovesItselfFromHCM() throws Exception { public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
// Save off current HConnections // Save off current HConnections

View File

@ -163,6 +163,8 @@ public class TestFilterWithScanLimits {
assertNull("Master is not running", e); assertNull("Master is not running", e);
} catch (ZooKeeperConnectionException e) { } catch (ZooKeeperConnectionException e) {
assertNull("Cannot connect to Zookeeper", e); assertNull("Cannot connect to Zookeeper", e);
} catch (IOException e) {
assertNull("IOException", e);
} }
createTable(); createTable();
prepareData(); prepareData();

View File

@ -173,6 +173,8 @@ public class TestFilterWrapper {
assertNull("Master is not running", e); assertNull("Master is not running", e);
} catch (ZooKeeperConnectionException e) { } catch (ZooKeeperConnectionException e) {
assertNull("Cannot connect to Zookeeper", e); assertNull("Cannot connect to Zookeeper", e);
} catch (IOException e) {
assertNull("Caught IOException", e);
} }
createTable(); createTable();
prepareData(); prepareData();

View File

@ -90,7 +90,7 @@ public class TestTimeRangeMapRed {
} }
@Before @Before
public void before() throws MasterNotRunningException, ZooKeeperConnectionException { public void before() throws Exception {
this.admin = new HBaseAdmin(UTIL.getConfiguration()); this.admin = new HBaseAdmin(UTIL.getConfiguration());
} }