HBASE-2431 Master does not respect generation stamps, may result in meta getting permanently offlined

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@942318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-05-08 06:27:15 +00:00
parent 6dae1e055b
commit 479c53423c
15 changed files with 679 additions and 487 deletions

View File

@ -304,6 +304,8 @@ Release 0.21.0 - Unreleased
crashes (Todd Lipcon via Stack)
HBASE-2513 hbase-2414 added bug where we'd tight-loop if no root available
HBASE-2503 PriorityQueue isn't thread safe, KeyValueHeap uses it that way
HBASE-2431 Master does not respect generation stamps, may result in meta
getting permanently offlined
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -59,17 +59,7 @@ public class ClusterStatus extends VersionedWritable {
* Constructor, for Writable
*/
public ClusterStatus() {
}
/**
* @return the names of region servers in the cluster
*/
public Collection<String> getServerNames() {
ArrayList<String> names = new ArrayList<String>(liveServerInfo.size());
for (HServerInfo server: liveServerInfo) {
names.add(server.getName());
}
return names;
super();
}
/**

View File

@ -27,14 +27,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
/**
* HServerAddress is a "label" for a HBase server that combines the host
* name and port number.
* HServerAddress is a "label" for a HBase server made of host and port number.
*/
public class HServerAddress implements WritableComparable<HServerAddress> {
private InetSocketAddress address;
String stringValue;
/** Empty constructor, used for Writable */
public HServerAddress() {
this.address = null;
this.stringValue = null;
@ -51,9 +49,7 @@ public class HServerAddress implements WritableComparable<HServerAddress> {
}
/**
* Construct a HServerAddress from a string of the form hostname:port
*
* @param hostAndPort format 'hostname:port'
* @param hostAndPort Hostname and port formatted as <code>&lt;hostname> ':' &lt;port></code>
*/
public HServerAddress(String hostAndPort) {
int colonIndex = hostAndPort.lastIndexOf(':');
@ -68,9 +64,8 @@ public class HServerAddress implements WritableComparable<HServerAddress> {
}
/**
* Construct a HServerAddress from hostname, port number
* @param bindAddress host name
* @param port port number
* @param bindAddress Hostname
* @param port Port number
*/
public HServerAddress(String bindAddress, int port) {
this.address = new InetSocketAddress(bindAddress, port);
@ -78,48 +73,45 @@ public class HServerAddress implements WritableComparable<HServerAddress> {
}
/**
* Construct a HServerAddress from another HServerAddress
* Copy-constructor
*
* @param other the HServerAddress to copy from
* @param other HServerAddress to copy from
*/
public HServerAddress(HServerAddress other) {
String bindAddress = other.getBindAddress();
int port = other.getPort();
address = new InetSocketAddress(bindAddress, port);
this.address = new InetSocketAddress(bindAddress, port);
stringValue = bindAddress + ":" + port;
}
/** @return bind address */
/** @return Bind address */
public String getBindAddress() {
return address.getAddress().getHostAddress();
return this.address.getAddress().getHostAddress();
}
/** @return port number */
/** @return Port number */
public int getPort() {
return address.getPort();
return this.address.getPort();
}
/** @return host name */
/** @return Hostname */
public String getHostname() {
return address.getHostName();
return this.address.getHostName();
}
/** @return the InetSocketAddress */
/** @return The InetSocketAddress */
public InetSocketAddress getInetSocketAddress() {
return address;
return this.address;
}
/**
* @see java.lang.Object#toString()
* @return String formatted as <code>&lt;bind address> ':' &lt;port></code>
*/
@Override
public String toString() {
return (stringValue == null ? "" : stringValue);
return (this.stringValue == null ? "" : this.stringValue);
}
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object o) {
if (this == o) {
@ -134,9 +126,6 @@ public class HServerAddress implements WritableComparable<HServerAddress> {
return this.compareTo((HServerAddress)o) == 0;
}
/**
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
int result = this.address.hashCode();

View File

@ -29,44 +29,46 @@ import org.apache.hadoop.io.WritableComparable;
/**
* HServerInfo contains metainfo about an HRegionServer, Currently it only
* contains the server start code.
*
* In the future it will contain information about the source machine and
* load statistics.
* HServerInfo is meta info about an {@link HRegionServer}.
* Holds hostname, ports, regionserver startcode, and load. Each server has
* a <code>servername</code> where servername is made up of a concatenation of
* hostname, port, and regionserver startcode.
*/
public class HServerInfo implements WritableComparable<HServerInfo> {
/**
* This character is used as separator making up the <code>servername</code>.
* Servername is made of host, port, and startcode formatted as
* <code>&lt;hostname> '{@link #SERVERNAME_SEPARATOR}' &lt;port> '{@ink #SEPARATOR"}' &lt;startcode></code>
* where {@link SEPARATOR is usually a ','.
*/
public static final String SERVERNAME_SEPARATOR = ",";
private HServerAddress serverAddress;
private long startCode;
private HServerLoad load;
private int infoPort;
// Servername is made of hostname, port and startcode.
private String serverName = null;
private String name;
// Hostname of the regionserver.
private String hostname;
private static Map<String,String> dnsCache = new HashMap<String,String>();
/** default constructor - used by Writable */
public HServerInfo() {
this(new HServerAddress(), 0,
HConstants.DEFAULT_REGIONSERVER_INFOPORT, "default name");
}
/**
* Constructor
* @param serverAddress
* @param startCode
* @param infoPort Port the info server is listening on.
*/
public HServerInfo(HServerAddress serverAddress, long startCode,
final int infoPort, String name) {
final int infoPort, String hostname) {
this.serverAddress = serverAddress;
this.startCode = startCode;
this.load = new HServerLoad();
this.infoPort = infoPort;
this.name = name;
this.hostname = hostname;
}
/**
* Construct a new object using another as input (like a copy constructor)
* Copy-constructor
* @param other
*/
public HServerInfo(HServerInfo other) {
@ -74,72 +76,53 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
this.startCode = other.getStartCode();
this.load = other.getLoad();
this.infoPort = other.getInfoPort();
this.name = other.getName();
this.hostname = other.hostname;
}
/**
* @return the load
*/
public HServerLoad getLoad() {
return load;
}
/**
* @param load the load to set
*/
public void setLoad(HServerLoad load) {
this.load = load;
}
/** @return the server address */
public synchronized HServerAddress getServerAddress() {
return new HServerAddress(serverAddress);
}
/**
* Change the server address.
* @param serverAddress New server address
*/
public synchronized void setServerAddress(HServerAddress serverAddress) {
this.serverAddress = serverAddress;
this.serverName = null;
}
/** @return the server start code */
public synchronized long getStartCode() {
return startCode;
}
/**
* @return Port the info server is listening on.
*/
public int getInfoPort() {
return this.infoPort;
}
/**
* @param infoPort - new port of info server
*/
public void setInfoPort(int infoPort) {
this.infoPort = infoPort;
}
/**
* @param startCode the startCode to set
*/
public synchronized void setStartCode(long startCode) {
this.startCode = startCode;
this.serverName = null;
}
/**
* @return the server name in the form hostname_startcode_port
* @return Server name made of the concatenation of hostname, port and
* startcode formatted as <code>&lt;hostname> ',' &lt;port> ',' &lt;startcode></code>
*/
public synchronized String getServerName() {
if (this.serverName == null) {
// if we have the hostname of the RS, use it
if(this.name != null) {
this.serverName = getServerName(this.name, this.serverAddress.getPort(), this.startCode);
if(this.hostname != null) {
this.serverName =
getServerName(this.hostname, this.serverAddress.getPort(), this.startCode);
}
// go to DNS name resolution only if we dont have the name of the RS
else {
@ -150,90 +133,10 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
}
/**
* Get the hostname of the server
* @return hostname
*/
public String getName() {
return name;
}
/**
* Set the hostname of the server
* @param name hostname
*/
public void setName(String name) {
this.name = name;
}
/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "address: " + this.serverAddress + ", startcode: " + this.startCode
+ ", load: (" + this.load.toString() + ")";
}
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
return compareTo((HServerInfo)obj) == 0;
}
/**
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
return this.getServerName().hashCode();
}
// Writable
public void readFields(DataInput in) throws IOException {
this.serverAddress.readFields(in);
this.startCode = in.readLong();
this.load.readFields(in);
this.infoPort = in.readInt();
this.name = in.readUTF();
}
public void write(DataOutput out) throws IOException {
this.serverAddress.write(out);
out.writeLong(this.startCode);
this.load.write(out);
out.writeInt(this.infoPort);
out.writeUTF(name);
}
public int compareTo(HServerInfo o) {
return this.getServerName().compareTo(o.getServerName());
}
/**
* @param info
* @return the server name in the form hostname_startcode_port
*/
private static String getServerName(HServerInfo info) {
return getServerName(info.getServerAddress(), info.getStartCode());
}
/**
* @param serverAddress in the form hostname:port
* @param startCode
* @return the server name in the form hostname_startcode_port
* @param serverAddress In form <code>&lt;hostname> ':' &lt;port></code>
* @param startCode Server startcode
* @return Server name made of the concatenation of hostname, port and
* startcode formatted as <code>&lt;hostname> ',' &lt;port> ',' &lt;startcode></code>
*/
public static String getServerName(String serverAddress, long startCode) {
String name = null;
@ -256,20 +159,78 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
}
/**
* @param address
* @param startCode
* @return the server name in the form hostname_startcode_port
* @param address Server address
* @param startCode Server startcode
* @return Server name made of the concatenation of hostname, port and
* startcode formatted as <code>&lt;hostname> ',' &lt;port> ',' &lt;startcode></code>
*/
public static String getServerName(HServerAddress address, long startCode) {
return getServerName(address.getHostname(), address.getPort(), startCode);
}
/*
* @param hostName
* @param port
* @param startCode
* @return Server name made of the concatenation of hostname, port and
* startcode formatted as <code>&lt;hostname> ',' &lt;port> ',' &lt;startcode></code>
*/
private static String getServerName(String hostName, int port, long startCode) {
StringBuilder name = new StringBuilder(hostName);
name.append(",");
name.append(SERVERNAME_SEPARATOR);
name.append(port);
name.append(",");
name.append(SERVERNAME_SEPARATOR);
name.append(startCode);
return name.toString();
}
/**
* @return ServerName and load concatenated.
* @see #getServerName()
* @see #getLoad()
*/
@Override
public String toString() {
return "serverName=" + getServerName() +
", load=(" + this.load.toString() + ")";
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
return compareTo((HServerInfo)obj) == 0;
}
@Override
public int hashCode() {
return this.getServerName().hashCode();
}
public void readFields(DataInput in) throws IOException {
this.serverAddress.readFields(in);
this.startCode = in.readLong();
this.load.readFields(in);
this.infoPort = in.readInt();
this.hostname = in.readUTF();
}
public void write(DataOutput out) throws IOException {
this.serverAddress.write(out);
out.writeLong(this.startCode);
this.load.write(out);
out.writeInt(this.infoPort);
out.writeUTF(hostname);
}
public int compareTo(HServerInfo o) {
return this.getServerName().compareTo(o.getServerName());
}
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -127,7 +128,8 @@ public class LocalHBaseCluster implements HConstants {
return addRegionServer(this.regionThreads.size());
}
public JVMClusterUtil.RegionServerThread addRegionServer(final int index) throws IOException {
public JVMClusterUtil.RegionServerThread addRegionServer(final int index)
throws IOException {
JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.createRegionServerThread(this.conf,
this.regionServerClass, index);
this.regionThreads.add(rst);
@ -156,6 +158,20 @@ public class LocalHBaseCluster implements HConstants {
return Collections.unmodifiableList(this.regionThreads);
}
/**
* @return List of running servers (Some servers may have been killed or
* aborted during lifetime of cluster; these servers are not included in this
* list).
*/
public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
List<JVMClusterUtil.RegionServerThread> liveServers =
new ArrayList<JVMClusterUtil.RegionServerThread>();
for (JVMClusterUtil.RegionServerThread rst: getRegionServers()) {
if (rst.isAlive()) liveServers.add(rst);
}
return liveServers;
}
/**
* Wait for the specified region server to stop
* Removes this thread from list of running threads.

View File

@ -114,7 +114,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// Use AtomicBoolean rather than plain boolean because we want other threads
// able to set shutdown flag. Using AtomicBoolean can pass a reference
// rather than have them have to know about the hosting Master class.
private final AtomicBoolean closed = new AtomicBoolean(true);
final AtomicBoolean closed = new AtomicBoolean(true);
// TODO: Is this separate flag necessary?
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
@ -515,7 +515,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
HRegionInterface hri =
this.connection.getHRegionConnection(address, false);
HServerInfo info = hri.getHServerInfo();
LOG.debug("Inspection found server " + info.getName());
LOG.debug("Inspection found server " + info.getServerName());
this.serverManager.recordNewServer(info, true);
regions = hri.getRegionsAssignment();
} catch (IOException e) {

View File

@ -54,7 +54,6 @@ class ProcessServerShutdown extends RegionServerOperation {
private boolean rootRescanned;
private HServerAddress deadServerAddress;
private static class ToDoEntry {
boolean regionOffline;
final HRegionInfo info;
@ -85,24 +84,19 @@ class ProcessServerShutdown extends RegionServerOperation {
}
private void closeMetaRegions() {
isRootServer = master.getRegionManager().isRootServer(deadServerAddress);
if (isRootServer) {
master.getRegionManager().unsetRootRegion();
}
else {
//HBASE-1928: Check whether this server has been transitioning the ROOT table
isRootServer = master.getRegionManager().isRootServerCandidate (deadServer);
if (isRootServer) {
master.getRegionManager().unsetRootRegion();
}
this.isRootServer =
this.master.getRegionManager().isRootServer(this.deadServerAddress) ||
this.master.getRegionManager().isRootServerCandidate (deadServer);
if (this.isRootServer) {
this.master.getRegionManager().unsetRootRegion();
}
List<byte[]> metaStarts =
this.master.getRegionManager().listMetaRegionsForServer(deadServerAddress);
List<byte[]> metaStarts = master.getRegionManager().listMetaRegionsForServer(deadServerAddress);
metaRegions = new ArrayList<MetaRegion>();
for (byte [] region : metaStarts) {
MetaRegion r = master.getRegionManager().offlineMetaRegion(region);
metaRegions.add(r);
this.metaRegions = new ArrayList<MetaRegion>();
for (byte [] startKey: metaStarts) {
MetaRegion r = master.getRegionManager().offlineMetaRegionWithStartKey(startKey);
this.metaRegions.add(r);
}
//HBASE-1928: Check whether this server has been transitioning the META table
@ -194,7 +188,7 @@ class ProcessServerShutdown extends RegionServerOperation {
Bytes.toString(info.getRegionName()) +
" from online meta regions");
}
master.getRegionManager().offlineMetaRegion(info.getStartKey());
master.getRegionManager().offlineMetaRegionWithStartKey(info.getStartKey());
}
ToDoEntry todo = new ToDoEntry(info);

View File

@ -786,11 +786,12 @@ public class RegionManager implements HConstants {
/**
* Set an online MetaRegion offline - remove it from the map.
* @param startKey region name
* @param startKey Startkey to use finding region to remove.
* @return the MetaRegion that was taken offline.
*/
public MetaRegion offlineMetaRegion(byte [] startKey) {
LOG.info("META region removed from onlineMetaRegions");
public MetaRegion offlineMetaRegionWithStartKey(byte [] startKey) {
LOG.info("META region whose startkey is " + Bytes.toString(startKey) +
" removed from onlineMetaRegions");
return onlineMetaRegions.remove(startKey);
}
@ -920,7 +921,7 @@ public class RegionManager implements HConstants {
for ( MetaRegion region : onlineMetaRegions.values() ) {
if (server.equals(region.getServer())) {
LOG.info("Offlining META region: " + region);
offlineMetaRegion(region.getStartKey());
offlineMetaRegionWithStartKey(region.getStartKey());
// Set for reassignment.
setUnassigned(region.getRegionInfo(), true);
hasMeta = true;
@ -1379,7 +1380,7 @@ public class RegionManager implements HConstants {
}
// check if current server is overloaded
int numRegionsToClose = balanceFromOverloaded(servLoad, avg);
int numRegionsToClose = balanceFromOverloaded(info, servLoad, avg);
// check if we can unload server by low loaded servers
if(numRegionsToClose <= 0) {
@ -1401,13 +1402,14 @@ public class RegionManager implements HConstants {
* Check if server load is not overloaded (with load > avgLoadPlusSlop).
* @return number of regions to unassign.
*/
private int balanceFromOverloaded(HServerLoad srvLoad, double avgLoad) {
private int balanceFromOverloaded(final HServerInfo info,
final HServerLoad srvLoad, final double avgLoad) {
int avgLoadPlusSlop = (int)Math.ceil(avgLoad * (1 + this.slop));
int numSrvRegs = srvLoad.getNumberOfRegions();
if (numSrvRegs > avgLoadPlusSlop) {
if (LOG.isDebugEnabled()) {
LOG.debug("Server is overloaded: load=" + numSrvRegs +
", avg=" + avgLoad + ", slop=" + this.slop);
LOG.debug("Server " + info.getServerName() + " is overloaded: load=" +
numSrvRegs + ", avg=" + avgLoad + ", slop=" + this.slop);
}
return numSrvRegs - (int)Math.ceil(avgLoad);
}

View File

@ -33,6 +33,8 @@ abstract class RegionServerOperation implements Delayed, HConstants {
private long expire;
protected final HMaster master;
/* How long we stay on queue.
*/
private int delay;
protected RegionServerOperation(HMaster master) {

View File

@ -117,12 +117,14 @@ public class ServerManager implements HConstants {
if (numDeadServers > 0) {
StringBuilder sb = new StringBuilder("Dead Server [");
boolean first = true;
for (String server: deadServers) {
if (!first) {
sb.append(", ");
first = false;
synchronized (deadServers) {
for (String server: deadServers) {
if (!first) {
sb.append(", ");
first = false;
}
sb.append(server);
}
sb.append(server);
}
sb.append("]");
deadServersList = sb.toString();
@ -159,47 +161,37 @@ public class ServerManager implements HConstants {
/**
* Let the server manager know a new regionserver has come online
* @param serverInfo
* @throws Leases.LeaseStillHeldException
* @throws IOException
*/
void regionServerStartup(final HServerInfo serverInfo)
throws Leases.LeaseStillHeldException {
throws IOException {
// Test for case where we get a region startup message from a regionserver
// that has been quickly restarted but whose znode expiration handler has
// not yet run, or from a server whose fail we are currently processing.
HServerInfo info = new HServerInfo(serverInfo);
String serverName = info.getServerName();
if (this.serversToServerInfo.containsKey(serverName) ||
this.deadServers.contains(serverName)) {
LOG.debug("Server start was rejected: " + serverInfo);
LOG.debug("serversToServerInfo.containsKey: " +
this.serversToServerInfo.containsKey(serverName));
LOG.debug("deadServers.contains: " +
this.deadServers.contains(serverName));
// TODO: Check zk instead.
throw new Leases.LeaseStillHeldException(serverName);
}
LOG.info("Received start message from: " + serverName);
// Go on to process the regionserver registration.
HServerLoad load = this.serversToLoad.remove(serverName);
if (load != null) {
// The startup message was from a known server.
// Remove stale information about the server's load.
synchronized (this.loadToServers) {
Set<String> servers = loadToServers.get(load);
if (servers != null) {
servers.remove(serverName);
if (servers.size() > 0)
this.loadToServers.put(load, servers);
else
this.loadToServers.remove(load);
}
String hostAndPort = info.getServerAddress().toString();
HServerInfo existingServer =
this.serverAddressToServerInfo.get(info.getServerAddress());
if (existingServer != null) {
LOG.info("Server start rejected; we already have " + hostAndPort +
" registered; existingServer=" + existingServer + ", newServer=" + info);
if (existingServer.getStartCode() < info.getStartCode()) {
LOG.info("Triggering server recovery; existingServer looks stale");
expireServer(existingServer);
}
throw new Leases.LeaseStillHeldException(hostAndPort);
}
HServerInfo storedInfo = this.serversToServerInfo.remove(serverName);
if (storedInfo != null && !this.master.isClosed()) {
// The startup message was from a known server with the same name.
// Timeout the old one right away.
this.master.getRegionManager().getRootRegionLocation();
RegionServerOperation op = new ProcessServerShutdown(master, storedInfo);
this.master.getRegionServerOperationQueue().put(op);
if (isDead(hostAndPort, true)) {
LOG.debug("Server start rejected; currently processing " + hostAndPort +
" failure");
throw new Leases.LeaseStillHeldException(hostAndPort);
}
if (isDead(hostAndPort, true)) {
LOG.debug("Server start rejected; currently processing " + hostAndPort +
" failure");
throw new Leases.LeaseStillHeldException(hostAndPort);
}
LOG.info("Received start message from: " + info.getServerName());
recordNewServer(info);
}
@ -223,7 +215,7 @@ public class ServerManager implements HConstants {
info.setLoad(load);
// We must set this watcher here because it can be set on a fresh start
// or on a failover
Watcher watcher = new ServerExpirer(serverName, info.getServerAddress());
Watcher watcher = new ServerExpirer(new HServerInfo(info));
this.master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher);
this.serversToServerInfo.put(serverName, info);
this.serverAddressToServerInfo.put(info.getServerAddress(), info);
@ -318,7 +310,7 @@ public class ServerManager implements HConstants {
synchronized (this.serversToServerInfo) {
removeServerInfo(info.getServerName(), info.getServerAddress());
this.serversToServerInfo.notifyAll();
notifyServers();
}
return new HMsg[] {HMsg.REGIONSERVER_STOP};
@ -337,53 +329,44 @@ public class ServerManager implements HConstants {
*/
private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) {
synchronized (this.serversToServerInfo) {
try {
// This method removes ROOT/META from the list and marks them to be reassigned
// in addition to other housework.
if (removeServerInfo(serverInfo.getServerName(),
serverInfo.getServerAddress())) {
// Only process the exit message if the server still has registered info.
// Otherwise we could end up processing the server exit twice.
LOG.info("Region server " + serverInfo.getServerName() +
": MSG_REPORT_EXITING");
// Get all the regions the server was serving reassigned
// (if we are not shutting down).
if (!this.master.isClosed()) {
for (int i = 1; i < msgs.length; i++) {
LOG.info("Processing " + msgs[i] + " from " +
serverInfo.getServerName());
assert msgs[i].getType() == HMsg.Type.MSG_REGION_CLOSE;
HRegionInfo info = msgs[i].getRegionInfo();
// Meta/root region offlining is handed in removeServerInfo above.
if (!info.isMetaRegion()) {
synchronized (this.master.getRegionManager()) {
if (!this.master.getRegionManager().isOfflined(
info.getRegionNameAsString())) {
this.master.getRegionManager().setUnassigned(info, true);
} else {
this.master.getRegionManager().removeRegion(info);
}
// This method removes ROOT/META from the list and marks them to be
// reassigned in addition to other housework.
if (removeServerInfo(serverInfo.getServerName(), serverInfo.getServerAddress())) {
// Only process the exit message if the server still has registered info.
// Otherwise we could end up processing the server exit twice.
LOG.info("Region server " + serverInfo.getServerName() +
": MSG_REPORT_EXITING");
// Get all the regions the server was serving reassigned
// (if we are not shutting down).
if (!master.closed.get()) {
for (int i = 1; i < msgs.length; i++) {
LOG.info("Processing " + msgs[i] + " from " +
serverInfo.getServerName());
assert msgs[i].getType() == HMsg.Type.MSG_REGION_CLOSE;
HRegionInfo info = msgs[i].getRegionInfo();
// Meta/root region offlining is handed in removeServerInfo above.
if (!info.isMetaRegion()) {
synchronized (master.getRegionManager()) {
if (!master.getRegionManager().isOfflined(info.getRegionNameAsString())) {
master.getRegionManager().setUnassigned(info, true);
} else {
master.getRegionManager().removeRegion(info);
}
}
}
}
// There should not be any regions in transition for this server - the
// server should finish transitions itself before closing
Map<String, RegionState> inTransition =
master.getRegionManager().getRegionsInTransitionOnServer(
serverInfo.getServerName());
for (Map.Entry<String, RegionState> entry : inTransition.entrySet()) {
LOG.warn("Region server " + serverInfo.getServerName() +
" shut down with region " + entry.getKey() + " in transition " +
"state " + entry.getValue());
master.getRegionManager().setUnassigned(entry.getValue().getRegionInfo(), true);
}
}
// We don't need to return anything to the server because it isn't
// going to do any more work.
} finally {
this.serversToServerInfo.notifyAll();
// There should not be any regions in transition for this server - the
// server should finish transitions itself before closing
Map<String, RegionState> inTransition = master.getRegionManager()
.getRegionsInTransitionOnServer(serverInfo.getServerName());
for (Map.Entry<String, RegionState> entry : inTransition.entrySet()) {
LOG.warn("Region server " + serverInfo.getServerName()
+ " shut down with region " + entry.getKey() + " in transition "
+ "state " + entry.getValue());
master.getRegionManager().setUnassigned(entry.getValue().getRegionInfo(),
true);
}
}
}
}
@ -526,7 +509,7 @@ public class ServerManager implements HConstants {
assignSplitDaughter(b);
if (region.isMetaTable()) {
// A meta region has split.
this.master.getRegionManager().offlineMetaRegion(region.getStartKey());
this. master.getRegionManager().offlineMetaRegionWithStartKey(region.getStartKey());
this.master.getRegionManager().incrementNumMetaRegions();
}
}
@ -648,7 +631,7 @@ public class ServerManager implements HConstants {
} else if (region.isMetaTable()) {
// Region is part of the meta table. Remove it from onlineMetaRegions
this.master.getRegionManager().offlineMetaRegion(region.getStartKey());
this.master.getRegionManager().offlineMetaRegionWithStartKey(region.getStartKey());
}
boolean offlineRegion =
@ -816,44 +799,58 @@ public class ServerManager implements HConstants {
/** Watcher triggered when a RS znode is deleted */
private class ServerExpirer implements Watcher {
private String server;
private HServerAddress serverAddress;
private HServerInfo server;
ServerExpirer(String server, HServerAddress serverAddress) {
this.server = server;
this.serverAddress = serverAddress;
ServerExpirer(final HServerInfo hsi) {
this.server = hsi;
}
public void process(WatchedEvent event) {
if (event.getType().equals(EventType.NodeDeleted)) {
LOG.info(server + " znode expired");
// Remove the server from the known servers list and update load info
serverAddressToServerInfo.remove(serverAddress);
HServerInfo info = serversToServerInfo.remove(server);
if (info != null) {
String serverName = info.getServerName();
HServerLoad load = serversToLoad.remove(serverName);
if (load != null) {
synchronized (loadToServers) {
Set<String> servers = loadToServers.get(load);
if (servers != null) {
servers.remove(serverName);
if(servers.size() > 0)
loadToServers.put(load, servers);
else
loadToServers.remove(load);
}
}
}
deadServers.add(server);
RegionServerOperation op = new ProcessServerShutdown(master, info);
master.getRegionServerOperationQueue().put(op);
}
synchronized (serversToServerInfo) {
serversToServerInfo.notifyAll();
if (!event.getType().equals(EventType.NodeDeleted)) {
LOG.warn("Unexpected event=" + event);
return;
}
LOG.info(this.server.getServerName() + " znode expired");
expireServer(this.server);
}
}
/*
* Expire the passed server. Add it to list of deadservers and queue a
* shutdown processing.
*/
private synchronized void expireServer(final HServerInfo hsi) {
// First check a server to expire. ServerName is of the form:
// <hostname> , <port> , <startcode>
String serverName = hsi.getServerName();
HServerInfo info = this.serversToServerInfo.get(serverName);
if (info == null) {
LOG.warn("No HServerInfo for " + serverName);
return;
}
if (this.deadServers.contains(serverName)) {
LOG.warn("Already processing shutdown of " + serverName);
return;
}
// Remove the server from the known servers lists and update load info
this.serverAddressToServerInfo.remove(info.getServerAddress());
this.serversToServerInfo.remove(serverName);
HServerLoad load = this.serversToLoad.remove(serverName);
if (load != null) {
synchronized (this.loadToServers) {
Set<String> servers = this.loadToServers.get(load);
if (servers != null) {
servers.remove(serverName);
if (servers.isEmpty()) this.loadToServers.remove(load);
}
}
}
// Add to dead servers and queue a shutdown processing.
LOG.debug("Added=" + serverName +
" to dead servers, added shutdown processing operation");
this.deadServers.add(serverName);
this.master.getRegionServerOperationQueue().
put(new ProcessServerShutdown(master, info));
}
/**
@ -867,8 +864,33 @@ public class ServerManager implements HConstants {
* @param serverName
* @return true if server is dead
*/
boolean isDead(String serverName) {
return this.deadServers.contains(serverName);
public boolean isDead(final String serverName) {
return isDead(serverName, false);
}
/**
* @param serverName Servername as either <code>host:port</code> or
* <code>host,port,startcode</code>.
* @param hostAndPortOnly True if <code>serverName</code> is host and
* port only (<code>host:port</code>) and if so, then we do a prefix compare
* (ignoring start codes) looking for dead server.
* @return true if server is dead
*/
boolean isDead(final String serverName, final boolean hostAndPortOnly) {
return isDead(this.deadServers, serverName, hostAndPortOnly);
}
static boolean isDead(final Set<String> deadServers,
final String serverName, final boolean hostAndPortOnly) {
if (!hostAndPortOnly) return deadServers.contains(serverName);
String serverNameColonReplaced =
serverName.replaceFirst(":", HServerInfo.SERVERNAME_SEPARATOR);
for (String hostPortStartCode: deadServers) {
int index = hostPortStartCode.lastIndexOf(HServerInfo.SERVERNAME_SEPARATOR);
String hostPortStrippedOfStartCode = hostPortStartCode.substring(0, index);
if (hostPortStrippedOfStartCode.equals(serverNameColonReplaced)) return true;
}
return false;
}
Set<String> getDeadServers() {

View File

@ -133,6 +133,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
// debugging and unit tests.
protected volatile boolean abortRequested;
private volatile boolean killed = false;
// If false, the file system has become unavailable
protected volatile boolean fsOk;
@ -612,7 +614,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
hlogRoller.interruptIfNecessary();
this.majorCompactionChecker.interrupt();
if (abortRequested) {
if (killed) {
// Just skip out w/o closing regions.
} else if (abortRequested) {
if (this.fsOk) {
// Only try to clean up if the file system is available
try {
@ -665,9 +669,10 @@ public class HRegionServer implements HConstants, HRegionInterface,
this.hbaseMaster = null;
}
join();
this.zooKeeperWrapper.close();
if (!killed) {
join();
this.zooKeeperWrapper.close();
}
LOG.info(Thread.currentThread().getName() + " exiting");
}
@ -1208,6 +1213,16 @@ public class HRegionServer implements HConstants, HRegionInterface,
stop();
}
/*
* Simulate a kill -9 of this server.
* Exits w/o closing regions or cleaninup logs but it does close socket in
* case want to bring up server on old hostname+port immediately.
*/
protected void kill() {
this.killed = true;
abort();
}
/**
* Wait on all threads to finish.
* Presumption is that all closes and stops have already been called.

View File

@ -28,23 +28,35 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
/**
* This class creates a single process HBase cluster. One thread is run for
* each server started. Pass how many instances of a RegionServer you want
* running in your cluster-in-a-single-jvm. Its modeled on MiniDFSCluster.
* Uses {@link LocalHBaseCluster}. Will run on top of whatever the currently
* configured FileSystem.
* This class creates a single process HBase cluster.
* each server. The master uses the 'default' FileSystem. The RegionServers,
* if we are running on DistributedFilesystem, create a FileSystem instance
* each and will close down their instance on the way out.
*/
public class MiniHBaseCluster implements HConstants {
static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
// Cache this. For some reason only works first time I get it. TODO: Figure
// out why.
private final static UserGroupInformation UGI;
static {
UGI = UserGroupInformation.getCurrentUGI();
}
private Configuration conf;
public LocalHBaseCluster hbaseCluster;
@ -110,12 +122,74 @@ public class MiniHBaseCluster implements HConstants {
}
/**
* Subclass so can get at protected methods (none at moment).
* Subclass so can get at protected methods (none at moment). Also, creates
* a FileSystem instance per instantiation. Adds a shutdown own FileSystem
* on the way out. Shuts down own Filesystem only, not All filesystems as
* the FileSystem system exit hook does.
*/
public static class MiniHBaseClusterRegionServer extends HRegionServer {
private static int index = 0;
public MiniHBaseClusterRegionServer(Configuration conf)
throws IOException {
super(conf);
super(setDifferentUser(conf));
}
/*
* @param c
* @param currentfs We return this if we did not make a new one.
* @param uniqueName Same name used to help identify the created fs.
* @return A new fs instance if we are up on DistributeFileSystem.
* @throws IOException
*/
private static Configuration setDifferentUser(final Configuration c)
throws IOException {
FileSystem currentfs = FileSystem.get(c);
if (!(currentfs instanceof DistributedFileSystem)) return c;
// Else distributed filesystem. Make a new instance per daemon. Below
// code is taken from the AppendTestUtil over in hdfs.
Configuration c2 = new Configuration(c);
String username = UGI.getUserName() + ".hrs." + index++;
UnixUserGroupInformation.saveToConf(c2,
UnixUserGroupInformation.UGI_PROPERTY_NAME,
new UnixUserGroupInformation(username, new String[]{"supergroup"}));
return c2;
}
@Override
protected void init(MapWritable c) throws IOException {
super.init(c);
// Change shutdown hook to only shutdown the FileSystem added above by
// {@link #getFileSystem(HBaseConfiguration)
if (getFileSystem() instanceof DistributedFileSystem) {
Thread t = new SingleFileSystemShutdownThread(getFileSystem());
Runtime.getRuntime().addShutdownHook(t);
}
}
public void kill() {
super.kill();
}
}
/**
* Alternate shutdown hook.
* Just shuts down the passed fs, not all as default filesystem hook does.
*/
static class SingleFileSystemShutdownThread extends Thread {
private final FileSystem fs;
SingleFileSystemShutdownThread(final FileSystem fs) {
super("Shutdown of " + fs);
this.fs = fs;
}
@Override
public void run() {
try {
LOG.info("Hook closing fs=" + this.fs);
this.fs.close();
} catch (IOException e) {
LOG.warn("Running hook", e);
}
}
}
@ -178,10 +252,6 @@ public class MiniHBaseCluster implements HConstants {
*/
public String abortRegionServer(int serverNumber) {
HRegionServer server = getRegionServer(serverNumber);
/*TODO: Prove not needed in TRUNK
// // Don't run hdfs shutdown thread.
// server.setHDFSShutdownThreadOnExit(null);
*/
LOG.info("Aborting " + server.toString());
server.abort();
return server.toString();
@ -212,10 +282,6 @@ public class MiniHBaseCluster implements HConstants {
JVMClusterUtil.RegionServerThread server =
hbaseCluster.getRegionServers().get(serverNumber);
LOG.info("Stopping " + server.toString());
if (!shutdownFS) {
// Stop the running of the hdfs shutdown thread in tests.
server.getRegionServer().setShutdownHDFS(false);
}
server.getRegionServer().stop();
return server;
}
@ -268,6 +334,13 @@ public class MiniHBaseCluster implements HConstants {
return this.hbaseCluster.getRegionServers();
}
/**
* @return List of live region server threads (skips the aborted and the killed)
*/
public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
return this.hbaseCluster.getLiveRegionServers();
}
/**
* Grab a numbered region server of your choice.
* @param serverNumber

View File

@ -19,15 +19,19 @@
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HMsg;
@ -49,6 +53,7 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -56,8 +61,8 @@ import org.junit.Test;
* Test transitions of state across the master. Sets up the cluster once and
* then runs a couple of tests.
*/
public class TestMasterTransistions {
private static final Log LOG = LogFactory.getLog(TestMasterTransistions.class);
public class TestMasterTransitions {
private static final Log LOG = LogFactory.getLog(TestMasterTransitions.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final String TABLENAME = "master_transitions";
private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
@ -68,6 +73,7 @@ public class TestMasterTransistions {
* @throws Exception
*/
@BeforeClass public static void beforeAllTests() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
// Start a cluster of two regionservers.
TEST_UTIL.startMiniCluster(2);
// Create a table of three families. This will assign a region.
@ -82,148 +88,13 @@ public class TestMasterTransistions {
TEST_UTIL.shutdownMiniCluster();
}
/**
* HBase2482 is about outstanding region openings. If any are outstanding
* when a regionserver goes down, then they'll never deploy. They'll be
* stuck in the regions-in-transition list for ever. This listener looks
* for a region opening HMsg and if its from the server passed on construction,
* then we kill it. It also looks out for a close message on the victim
* server because that signifies start of the fireworks.
*/
static class HBase2482Listener implements RegionServerOperationListener {
private final HRegionServer victim;
private boolean abortSent = false;
// We closed regions on new server.
private volatile boolean closed = false;
// Copy of regions on new server
private final Collection<HRegion> copyOfOnlineRegions;
// This is the region that was in transition on the server we aborted. Test
// passes if this region comes back online successfully.
private HRegionInfo regionToFind;
HBase2482Listener(final HRegionServer victim) {
this.victim = victim;
// Copy regions currently open on this server so I can notice when
// there is a close.
this.copyOfOnlineRegions =
this.victim.getCopyOfOnlineRegionsSortedBySize().values();
@Before public void setup() throws IOException {
if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
// Need at least two servers.
LOG.info("Started new server=" +
TEST_UTIL.getHBaseCluster().startRegionServer());
}
@Override
public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
if (!victim.getServerInfo().equals(serverInfo) ||
this.abortSent || !this.closed) {
return true;
}
if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_PROCESS_OPEN)) return true;
// Save the region that is in transition so can test later it came back.
this.regionToFind = incomingMsg.getRegionInfo();
LOG.info("ABORTING " + this.victim + " because got a " +
HMsg.Type.MSG_REPORT_PROCESS_OPEN + " on this server for " +
incomingMsg.getRegionInfo().getRegionNameAsString());
this.victim.abort();
this.abortSent = true;
return true;
}
@Override
public boolean process(RegionServerOperation op) throws IOException {
return true;
}
@Override
public void processed(RegionServerOperation op) {
if (this.closed || !(op instanceof ProcessRegionClose)) return;
ProcessRegionClose close = (ProcessRegionClose)op;
for (HRegion r: this.copyOfOnlineRegions) {
if (r.getRegionInfo().equals(close.regionInfo)) {
// We've closed one of the regions that was on the victim server.
// Now can start testing for when all regions are back online again
LOG.info("Found close of " +
r.getRegionInfo().getRegionNameAsString() +
"; setting close happened flag");
this.closed = true;
break;
}
}
}
}
/**
* In 2482, a RS with an opening region on it dies. The said region is then
* stuck in the master's regions-in-transition and never leaves it. This
* test works by bringing up a new regionserver, waiting for the load
* balancer to give it some regions. Then, we close all on the new server.
* After sending all the close messages, we send the new regionserver the
* special blocking message so it can not process any more messages.
* Meantime reopening of the just-closed regions is backed up on the new
* server. Soon as master gets an opening region from the new regionserver,
* we kill it. We then wait on all regions to combe back on line. If bug
* is fixed, this should happen soon as the processing of the killed server is
* done.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2482">HBASE-2482</a>
*/
@Test public void testKillRSWithOpeningRegion2482() throws Exception {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
// Count how many regions are online. They need to be all back online for
// this test to succeed.
int countOfMetaRegions = countOfMetaRegions();
// Add a listener on the server.
HMaster m = cluster.getMaster();
// Start new regionserver.
MiniHBaseClusterRegionServer hrs =
(MiniHBaseClusterRegionServer)cluster.startRegionServer().getRegionServer();
LOG.info("Started new regionserver: " + hrs.toString());
// Wait until has some regions before proceeding. Balancer will give it some.
int minimumRegions =
countOfMetaRegions/(cluster.getRegionServerThreads().size() * 2);
while (hrs.getOnlineRegions().size() < minimumRegions) Threads.sleep(100);
// Set the listener only after some regions have been opened on new server.
HBase2482Listener listener = new HBase2482Listener(hrs);
m.getRegionServerOperationQueue().
registerRegionServerOperationListener(listener);
try {
// Go close all non-catalog regions on this new server
closeAlltNonCatalogRegions(cluster, hrs);
// After all closes, add blocking message before the region opens start to
// come in.
cluster.addMessageToSendRegionServer(hrs,
new HMsg(HMsg.Type.TESTING_MSG_BLOCK_RS));
// Wait till one of the above close messages has an effect before we start
// wait on all regions back online.
while (!listener.closed) Threads.sleep(100);
LOG.info("Past close");
// Make sure the abort server message was sent.
while(!listener.abortSent) Threads.sleep(100);
LOG.info("Past abort send; waiting on all regions to redeploy");
// Now wait for regions to come back online.
assertRegionIsBackOnline(listener.regionToFind);
} finally {
m.getRegionServerOperationQueue().
unregisterRegionServerOperationListener(listener);
}
}
/*
* @param cluster
* @param hrs
* @return Count of regions closed.
* @throws IOException
*/
private int closeAlltNonCatalogRegions(final MiniHBaseCluster cluster,
final MiniHBaseCluster.MiniHBaseClusterRegionServer hrs)
throws IOException {
int countOfRegions = 0;
for (HRegion r: hrs.getOnlineRegions()) {
if (r.getRegionInfo().isMetaRegion()) continue;
cluster.addMessageToSendRegionServer(hrs,
new HMsg(HMsg.Type.MSG_REGION_CLOSE, r.getRegionInfo()));
LOG.info("Sent close of " + r.getRegionInfo().getRegionNameAsString() +
" on " + hrs.toString());
countOfRegions++;
}
return countOfRegions;
}
/**
@ -247,7 +118,7 @@ public class TestMasterTransistions {
private int closeCount = 0;
static final int SERVER_DURATION = 3 * 1000;
static final int CLOSE_DURATION = 1 * 1000;
HBase2428Listener(final MiniHBaseCluster c, final HServerAddress metaAddress,
final HRegionInfo closingHRI, final int otherServerIndex) {
this.cluster = c;
@ -332,9 +203,11 @@ public class TestMasterTransistions {
/**
* In 2428, the meta region has just been set offline and then a close comes
* in.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2428">HBASE-2428</a>
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2428">HBASE-2428</a>
*/
@Test public void testRegionCloseWhenNoMetaHBase2428() throws Exception {
@Test (timeout=300000) public void testRegionCloseWhenNoMetaHBase2428()
throws Exception {
LOG.info("Running testRegionCloseWhenNoMetaHBase2428");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
final HMaster master = cluster.getMaster();
int metaIndex = cluster.getServerWithMeta();
@ -351,14 +224,14 @@ public class TestMasterTransistions {
// Get a region out on the otherServer.
final HRegionInfo hri =
otherServer.getOnlineRegions().iterator().next().getRegionInfo();
// Add our ReionServerOperationsListener
// Add our RegionServerOperationsListener
HBase2428Listener listener = new HBase2428Listener(cluster,
metaHRS.getHServerInfo().getServerAddress(), hri, otherServerIndex);
master.getRegionServerOperationQueue().
registerRegionServerOperationListener(listener);
try {
// Now close the server carrying index.
// Now close the server carrying meta.
cluster.abortRegionServer(metaIndex);
// First wait on receipt of meta server shutdown message.
@ -379,6 +252,202 @@ public class TestMasterTransistions {
}
}
/**
* Test adding in a new server before old one on same host+port is dead.
* Make the test more onerous by having the server under test carry the meta.
* If confusion between old and new, purportedly meta never comes back. Test
* that meta gets redeployed.
*/
@Test (timeout=300000) public void testAddingServerBeforeOldIsDead2413()
throws IOException {
LOG.info("Running testAddingServerBeforeOldIsDead2413");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
int count = count();
int metaIndex = cluster.getServerWithMeta();
MiniHBaseClusterRegionServer metaHRS =
(MiniHBaseClusterRegionServer)cluster.getRegionServer(metaIndex);
int port = metaHRS.getServerInfo().getServerAddress().getPort();
Configuration c = TEST_UTIL.getConfiguration();
String oldPort = c.get(HConstants.REGIONSERVER_PORT, "0");
try {
LOG.info("KILLED=" + metaHRS);
metaHRS.kill();
c.set(HConstants.REGIONSERVER_PORT, Integer.toString(port));
// Try and start new regionserver. It might clash with the old
// regionserver port so keep trying to get past the BindException.
HRegionServer hrs = null;
while (true) {
try {
hrs = cluster.startRegionServer().getRegionServer();
break;
} catch (IOException e) {
if (e.getCause() != null && e.getCause() instanceof InvocationTargetException) {
InvocationTargetException ee = (InvocationTargetException)e.getCause();
if (ee.getCause() != null && ee.getCause() instanceof BindException) {
LOG.info("BindException; retrying: " + e.toString());
}
}
}
}
LOG.info("STARTED=" + hrs);
// Wait until he's been given at least 3 regions before we go on to try
// and count rows in table.
while (hrs.getOnlineRegions().size() < 3) Threads.sleep(100);
LOG.info(hrs.toString() + " has " + hrs.getOnlineRegions().size() +
" regions");
assertEquals(count, count());
} finally {
c.set(HConstants.REGIONSERVER_PORT, oldPort);
}
}
/**
* HBase2482 is about outstanding region openings. If any are outstanding
* when a regionserver goes down, then they'll never deploy. They'll be
* stuck in the regions-in-transition list for ever. This listener looks
* for a region opening HMsg and if its from the server passed on construction,
* then we kill it. It also looks out for a close message on the victim
* server because that signifies start of the fireworks.
*/
static class HBase2482Listener implements RegionServerOperationListener {
private final HRegionServer victim;
private boolean abortSent = false;
// We closed regions on new server.
private volatile boolean closed = false;
// Copy of regions on new server
private final Collection<HRegion> copyOfOnlineRegions;
// This is the region that was in transition on the server we aborted. Test
// passes if this region comes back online successfully.
private HRegionInfo regionToFind;
HBase2482Listener(final HRegionServer victim) {
this.victim = victim;
// Copy regions currently open on this server so I can notice when
// there is a close.
this.copyOfOnlineRegions =
this.victim.getCopyOfOnlineRegionsSortedBySize().values();
}
@Override
public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
if (!victim.getServerInfo().equals(serverInfo) ||
this.abortSent || !this.closed) {
return true;
}
if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_PROCESS_OPEN)) return true;
// Save the region that is in transition so can test later it came back.
this.regionToFind = incomingMsg.getRegionInfo();
LOG.info("ABORTING " + this.victim + " because got a " +
HMsg.Type.MSG_REPORT_PROCESS_OPEN + " on this server for " +
incomingMsg.getRegionInfo().getRegionNameAsString());
this.victim.abort();
this.abortSent = true;
return true;
}
@Override
public boolean process(RegionServerOperation op) throws IOException {
return true;
}
@Override
public void processed(RegionServerOperation op) {
if (this.closed || !(op instanceof ProcessRegionClose)) return;
ProcessRegionClose close = (ProcessRegionClose)op;
for (HRegion r: this.copyOfOnlineRegions) {
if (r.getRegionInfo().equals(close.regionInfo)) {
// We've closed one of the regions that was on the victim server.
// Now can start testing for when all regions are back online again
LOG.info("Found close of " +
r.getRegionInfo().getRegionNameAsString() +
"; setting close happened flag");
this.closed = true;
break;
}
}
}
}
/**
* In 2482, a RS with an opening region on it dies. The said region is then
* stuck in the master's regions-in-transition and never leaves it. This
* test works by bringing up a new regionserver, waiting for the load
* balancer to give it some regions. Then, we close all on the new server.
* After sending all the close messages, we send the new regionserver the
* special blocking message so it can not process any more messages.
* Meantime reopening of the just-closed regions is backed up on the new
* server. Soon as master gets an opening region from the new regionserver,
* we kill it. We then wait on all regions to come back on line. If bug
* is fixed, this should happen soon as the processing of the killed server is
* done.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2482">HBASE-2482</a>
*/
@Test (timeout=300000) public void testKillRSWithOpeningRegion2482()
throws Exception {
LOG.info("Running testKillRSWithOpeningRegion2482");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
if (cluster.getLiveRegionServerThreads().size() < 2) {
// Need at least two servers.
cluster.startRegionServer();
}
// Count how many regions are online. They need to be all back online for
// this test to succeed.
int countOfMetaRegions = countOfMetaRegions();
// Add a listener on the server.
HMaster m = cluster.getMaster();
// Start new regionserver.
MiniHBaseClusterRegionServer hrs =
(MiniHBaseClusterRegionServer)cluster.startRegionServer().getRegionServer();
LOG.info("Started new regionserver: " + hrs.toString());
// Wait until has some regions before proceeding. Balancer will give it some.
int minimumRegions =
countOfMetaRegions/(cluster.getRegionServerThreads().size() * 2);
while (hrs.getOnlineRegions().size() < minimumRegions) Threads.sleep(100);
// Set the listener only after some regions have been opened on new server.
HBase2482Listener listener = new HBase2482Listener(hrs);
m.getRegionServerOperationQueue().
registerRegionServerOperationListener(listener);
try {
// Go close all non-catalog regions on this new server
closeAllNonCatalogRegions(cluster, hrs);
// After all closes, add blocking message before the region opens start to
// come in.
cluster.addMessageToSendRegionServer(hrs,
new HMsg(HMsg.Type.TESTING_MSG_BLOCK_RS));
// Wait till one of the above close messages has an effect before we start
// wait on all regions back online.
while (!listener.closed) Threads.sleep(100);
LOG.info("Past close");
// Make sure the abort server message was sent.
while(!listener.abortSent) Threads.sleep(100);
LOG.info("Past abort send; waiting on all regions to redeploy");
// Now wait for regions to come back online.
assertRegionIsBackOnline(listener.regionToFind);
} finally {
m.getRegionServerOperationQueue().
unregisterRegionServerOperationListener(listener);
}
}
/*
* @return Count of all non-catalog regions on the designated server
*/
private int closeAllNonCatalogRegions(final MiniHBaseCluster cluster,
final MiniHBaseCluster.MiniHBaseClusterRegionServer hrs)
throws IOException {
int countOfRegions = 0;
for (HRegion r: hrs.getOnlineRegions()) {
if (r.getRegionInfo().isMetaRegion()) continue;
cluster.addMessageToSendRegionServer(hrs,
new HMsg(HMsg.Type.MSG_REGION_CLOSE, r.getRegionInfo()));
LOG.info("Sent close of " + r.getRegionInfo().getRegionNameAsString() +
" on " + hrs.toString());
countOfRegions++;
}
return countOfRegions;
}
private void assertRegionIsBackOnline(final HRegionInfo hri)
throws IOException {
// Region should have an entry in its startkey because of addRowToEachRegion.
@ -414,7 +483,7 @@ public class TestMasterTransistions {
// If I get to here and all rows have a Server, then all have been assigned.
if (rows == countOfRegions) break;
LOG.info("Found=" + rows);
Threads.sleep(1000);
Threads.sleep(1000);
}
}
@ -472,6 +541,23 @@ public class TestMasterTransistions {
return rows;
}
/*
* @return Count of rows in TABLENAME
* @throws IOException
*/
private static int count() throws IOException {
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
int rows = 0;
Scan scan = new Scan();
ResultScanner s = t.getScanner(scan);
for (Result r = null; (r = s.next()) != null;) {
rows++;
}
s.close();
LOG.info("Counted=" + rows);
return rows;
}
/*
* @param hri
* @return Start key for hri (If start key is '', then return 'aaa'.

View File

@ -62,7 +62,7 @@ public class TestRegionManager extends HBaseClusterTestCase {
HRegionInfo regionInfoX = new HRegionInfo(tableDesc, startKeyX, endKeyX);
master.getRegionManager().offlineMetaRegion(startKey0);
master.getRegionManager().offlineMetaRegionWithStartKey(startKey0);
master.getRegionManager().putMetaRegionOnline(meta0);
master.getRegionManager().putMetaRegionOnline(meta1);
master.getRegionManager().putMetaRegionOnline(meta2);

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.*;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
public class TestServerManager {
@Test public void testIsDead() {
Set<String> deadServers = new HashSet<String>();
final String hostname123 = "one,123,3";
assertFalse(ServerManager.isDead(deadServers, hostname123, false));
assertFalse(ServerManager.isDead(deadServers, hostname123, true));
deadServers.add(hostname123);
assertTrue(ServerManager.isDead(deadServers, hostname123, false));
assertFalse(ServerManager.isDead(deadServers, "one:1", true));
assertFalse(ServerManager.isDead(deadServers, "one:1234", true));
assertTrue(ServerManager.isDead(deadServers, "one:123", true));
}
}