HBASE-1157, HBASE-1156 If we do not take start code as a part of region server recovery, we could inadvertantly try to reassign regions assigned to a restarted server with a different start code; Improve lease handling
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@753483 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d67d131f10
commit
e8520cf5d3
|
@ -42,6 +42,10 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1169 When a shutdown is requested, stop scanning META regions immediately
|
||||
HBASE-1251 HConnectionManager.getConnection(HBaseConfiguration) returns same
|
||||
HConnection for different HBaseConfigurations
|
||||
HBASE-1157, HBASE-1156 If we do not take start code as a part of region
|
||||
server recovery, we could inadvertantly try to reassign regions
|
||||
assigned to a restarted server with a different start code;
|
||||
Improve lease handling
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.io.DataInput;
|
|||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JenkinsHash;
|
||||
import org.apache.hadoop.io.VersionedWritable;
|
||||
|
@ -59,7 +58,7 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable
|
|||
private byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
|
||||
private boolean offLine = false;
|
||||
private long regionId = -1;
|
||||
private byte [] regionName = HConstants.EMPTY_BYTE_ARRAY;
|
||||
private transient byte [] regionName = HConstants.EMPTY_BYTE_ARRAY;
|
||||
private String regionNameStr = "";
|
||||
private boolean split = false;
|
||||
private byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
|
||||
|
@ -221,6 +220,7 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable
|
|||
* Separate elements of a regionName.
|
||||
* @param regionName
|
||||
* @return Array of byte[] containing tableName, startKey and id
|
||||
* @throws IOException
|
||||
*/
|
||||
public static byte [][] parseRegionName(final byte [] regionName)
|
||||
throws IOException {
|
||||
|
@ -438,6 +438,8 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable
|
|||
|
||||
/**
|
||||
* For internal use in forcing splits ahead of file size limit.
|
||||
* @param b
|
||||
* @return previous value
|
||||
*/
|
||||
public boolean shouldSplit(boolean b) {
|
||||
boolean old = this.splitRequest;
|
||||
|
|
|
@ -38,6 +38,7 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
|
|||
private long startCode;
|
||||
private HServerLoad load;
|
||||
private int infoPort;
|
||||
private transient volatile String serverName = null;
|
||||
|
||||
/** default constructor - used by Writable */
|
||||
public HServerInfo() {
|
||||
|
@ -85,15 +86,16 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
|
|||
|
||||
/** @return the server address */
|
||||
public HServerAddress getServerAddress() {
|
||||
return serverAddress;
|
||||
return new HServerAddress(serverAddress);
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the server address.
|
||||
* @param serverAddress New server address
|
||||
*/
|
||||
public void setServerAddress(HServerAddress serverAddress) {
|
||||
public synchronized void setServerAddress(HServerAddress serverAddress) {
|
||||
this.serverAddress = serverAddress;
|
||||
this.serverName = null;
|
||||
}
|
||||
|
||||
/** @return the server start code */
|
||||
|
@ -111,8 +113,19 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
|
|||
/**
|
||||
* @param startCode the startCode to set
|
||||
*/
|
||||
public void setStartCode(long startCode) {
|
||||
public synchronized void setStartCode(long startCode) {
|
||||
this.startCode = startCode;
|
||||
this.serverName = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the server name in the form hostname_startcode_port
|
||||
*/
|
||||
public synchronized String getServerName() {
|
||||
if (this.serverName == null) {
|
||||
this.serverName = getServerName(this.serverAddress, this.startCode);
|
||||
}
|
||||
return this.serverName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,10 +141,7 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = this.serverAddress.hashCode();
|
||||
result ^= this.infoPort;
|
||||
result ^= this.startCode;
|
||||
return result;
|
||||
return this.getServerName().hashCode();
|
||||
}
|
||||
|
||||
|
||||
|
@ -152,17 +162,46 @@ public class HServerInfo implements WritableComparable<HServerInfo> {
|
|||
}
|
||||
|
||||
public int compareTo(HServerInfo o) {
|
||||
int result = getServerAddress().compareTo(o.getServerAddress());
|
||||
if (result != 0) {
|
||||
return result;
|
||||
return this.getServerName().compareTo(o.getServerName());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param info
|
||||
* @return the server name in the form hostname_startcode_port
|
||||
*/
|
||||
public static String getServerName(HServerInfo info) {
|
||||
return getServerName(info.getServerAddress(), info.getStartCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serverAddress in the form hostname:port
|
||||
* @param startCode
|
||||
* @return the server name in the form hostname_startcode_port
|
||||
*/
|
||||
public static String getServerName(String serverAddress, long startCode) {
|
||||
String name = null;
|
||||
if (serverAddress != null) {
|
||||
HServerAddress address = new HServerAddress(serverAddress);
|
||||
name = getServerName(address.getHostname(), address.getPort(), startCode);
|
||||
}
|
||||
if (this.infoPort != o.infoPort) {
|
||||
return this.infoPort - o.infoPort;
|
||||
}
|
||||
if (getStartCode() == o.getStartCode()) {
|
||||
return 0;
|
||||
}
|
||||
// Startcodes are timestamps.
|
||||
return (int)(getStartCode() - o.getStartCode());
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param address
|
||||
* @param startCode
|
||||
* @return the server name in the form hostname_startcode_port
|
||||
*/
|
||||
public static String getServerName(HServerAddress address, long startCode) {
|
||||
return getServerName(address.getHostname(), address.getPort(), startCode);
|
||||
}
|
||||
|
||||
private static String getServerName(String hostName, int port, long startCode) {
|
||||
StringBuilder name = new StringBuilder(hostName);
|
||||
name.append("_");
|
||||
name.append(startCode);
|
||||
name.append("_");
|
||||
name.append(port);
|
||||
return name.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -327,40 +327,40 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
}
|
||||
|
||||
protected void checkAssigned(final HRegionInfo info,
|
||||
final String serverName, final long startCode)
|
||||
final String serverAddress, final long startCode)
|
||||
throws IOException {
|
||||
|
||||
synchronized (this.master.regionManager) {
|
||||
/*
|
||||
* We don't assign regions that are offline, in transition or were on
|
||||
* a dead server. Regions that were on a dead server will get reassigned
|
||||
* by ProcessServerShutdown
|
||||
*/
|
||||
if(info.isOffline() ||
|
||||
this.master.regionManager.regionIsInTransition(info.getRegionName()) ||
|
||||
this.master.serverManager.isDead(serverName)) {
|
||||
|
||||
return;
|
||||
}
|
||||
HServerInfo storedInfo = null;
|
||||
if (serverName.length() != 0) {
|
||||
String serverName = null;
|
||||
if (serverAddress != null && serverAddress.length() > 0) {
|
||||
serverName = HServerInfo.getServerName(serverAddress, startCode);
|
||||
}
|
||||
HServerInfo storedInfo = null;
|
||||
synchronized (this.master.regionManager) {
|
||||
if (serverName != null) {
|
||||
/*
|
||||
* We don't assign regions that are offline, in transition or were on
|
||||
* a dead server. Regions that were on a dead server will get reassigned
|
||||
* by ProcessServerShutdown
|
||||
*/
|
||||
if(info.isOffline() ||
|
||||
this.master.regionManager.regionIsInTransition(
|
||||
info.getRegionNameAsString()) ||
|
||||
this.master.serverManager.isDead(serverName)) {
|
||||
|
||||
return;
|
||||
}
|
||||
storedInfo = this.master.serverManager.getServerInfo(serverName);
|
||||
}
|
||||
|
||||
/*
|
||||
* If the startcode is off -- either null or doesn't match the start code
|
||||
* for the address -- then add it to the list of unassigned regions.
|
||||
*/
|
||||
if (storedInfo == null || storedInfo.getStartCode() != startCode) {
|
||||
// If we can't find the HServerInfo, then add it to the list of
|
||||
// unassigned regions.
|
||||
|
||||
if (storedInfo == null) {
|
||||
// The current assignment is invalid
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Current assignment of " + info.getRegionNameAsString() +
|
||||
" is not valid; " +
|
||||
(storedInfo == null ? " Server '" + serverName + "' unknown." :
|
||||
" serverInfo: " + storedInfo + ", passed startCode: " +
|
||||
startCode + ", storedInfo.startCode: " +
|
||||
storedInfo.getStartCode()));
|
||||
" is not valid; " + " Server '" + serverAddress + "' startCode: " +
|
||||
startCode + " unknown.");
|
||||
}
|
||||
|
||||
// Recover the region server's log if there is one.
|
||||
|
@ -368,10 +368,9 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
// data in the meta region. Once we are on-line, dead server log
|
||||
// recovery is handled by lease expiration and ProcessServerShutdown
|
||||
if (!this.master.regionManager.isInitialMetaScanComplete() &&
|
||||
serverName.length() != 0) {
|
||||
StringBuilder dirName = new StringBuilder("log_");
|
||||
dirName.append(serverName.replace(":", "_"));
|
||||
Path logDir = new Path(this.master.rootdir, dirName.toString());
|
||||
serverName != null) {
|
||||
Path logDir =
|
||||
new Path(this.master.rootdir, HLog.getHLogDirectoryName(serverName));
|
||||
try {
|
||||
if (master.fs.exists(logDir)) {
|
||||
this.master.regionManager.splitLogLock.lock();
|
||||
|
|
|
@ -49,10 +49,9 @@ class ChangeTableState extends TableOperation {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void processScanItem(String serverName, long startCode,
|
||||
HRegionInfo info) {
|
||||
protected void processScanItem(String serverName, HRegionInfo info) {
|
||||
|
||||
if (isBeingServed(serverName, startCode)) {
|
||||
if (isBeingServed(serverName)) {
|
||||
HashSet<HRegionInfo> regions = servedRegions.get(serverName);
|
||||
if (regions == null) {
|
||||
regions = new HashSet<HRegionInfo>();
|
||||
|
@ -91,7 +90,7 @@ class ChangeTableState extends TableOperation {
|
|||
synchronized (master.regionManager) {
|
||||
if (online) {
|
||||
// Bring offline regions on-line
|
||||
if (!master.regionManager.regionIsOpening(i.getRegionName())) {
|
||||
if (!master.regionManager.regionIsOpening(i.getRegionNameAsString())) {
|
||||
master.regionManager.setUnassigned(i, false);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -38,8 +38,9 @@ abstract class ColumnOperation extends TableOperation {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void processScanItem(String serverName, long startCode,
|
||||
final HRegionInfo info) throws IOException {
|
||||
protected void processScanItem(@SuppressWarnings("unused") String serverName,
|
||||
final HRegionInfo info)
|
||||
throws IOException {
|
||||
if (isEnabled(info)) {
|
||||
throw new TableNotDisabledException(tableName);
|
||||
}
|
||||
|
|
|
@ -548,7 +548,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
/*
|
||||
* HMasterRegionInterface
|
||||
*/
|
||||
public MapWritable regionServerStartup(final HServerInfo serverInfo) {
|
||||
public MapWritable regionServerStartup(final HServerInfo serverInfo)
|
||||
throws IOException {
|
||||
// Set the address for now even tho it will not be persisted on HRS side.
|
||||
String rsAddress = HBaseServer.getRemoteAddress();
|
||||
serverInfo.setServerAddress(new HServerAddress(rsAddress,
|
||||
|
|
|
@ -57,7 +57,7 @@ class ModifyTableMeta extends TableOperation {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void processScanItem(String serverName, long startCode,
|
||||
protected void processScanItem(@SuppressWarnings("unused") String serverName,
|
||||
final HRegionInfo info) throws IOException {
|
||||
if (isEnabled(info)) {
|
||||
throw new TableNotDisabledException(tableName.toString());
|
||||
|
|
|
@ -34,8 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
* root region which is handled specially.
|
||||
*/
|
||||
class ProcessRegionOpen extends ProcessRegionStatusChange {
|
||||
protected final HServerAddress serverAddress;
|
||||
protected final byte [] startCode;
|
||||
protected final HServerInfo serverInfo;
|
||||
|
||||
/**
|
||||
* @param master
|
||||
|
@ -44,21 +43,20 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
public ProcessRegionOpen(HMaster master, HServerInfo info,
|
||||
HRegionInfo regionInfo)
|
||||
public ProcessRegionOpen(HMaster master, HServerInfo info,
|
||||
HRegionInfo regionInfo)
|
||||
throws IOException {
|
||||
super(master, regionInfo);
|
||||
this.serverAddress = info.getServerAddress();
|
||||
if (this.serverAddress == null) {
|
||||
throw new NullPointerException("Server address cannot be null; " +
|
||||
if (info == null) {
|
||||
throw new NullPointerException("HServerInfo cannot be null; " +
|
||||
"hbase-958 debugging");
|
||||
}
|
||||
this.startCode = Bytes.toBytes(info.getStartCode());
|
||||
this.serverInfo = info;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PendingOpenOperation from " + serverAddress.toString();
|
||||
return "PendingOpenOperation from " + HServerInfo.getServerName(serverInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,7 +67,7 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
|
||||
public Boolean call() throws IOException {
|
||||
LOG.info(regionInfo.getRegionNameAsString() + " open on " +
|
||||
serverAddress.toString());
|
||||
serverInfo.getServerAddress().toString());
|
||||
if (!metaRegionAvailable()) {
|
||||
// We can't proceed unless the meta region we are going to update
|
||||
// is online. metaRegionAvailable() has put this operation on the
|
||||
|
@ -80,12 +78,13 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
|
||||
// Register the newly-available Region's location.
|
||||
LOG.info("updating row " + regionInfo.getRegionNameAsString() +
|
||||
" in region " + Bytes.toString(metaRegionName) +
|
||||
" with startcode " + Bytes.toLong(startCode) + " and server " +
|
||||
serverAddress.toString());
|
||||
" in region " + Bytes.toString(metaRegionName) + " with " +
|
||||
" with startcode " + serverInfo.getStartCode() + " and server " +
|
||||
serverInfo.getServerAddress());
|
||||
BatchUpdate b = new BatchUpdate(regionInfo.getRegionName());
|
||||
b.put(COL_SERVER, Bytes.toBytes(serverAddress.toString()));
|
||||
b.put(COL_STARTCODE, startCode);
|
||||
b.put(COL_SERVER,
|
||||
Bytes.toBytes(serverInfo.getServerAddress().toString()));
|
||||
b.put(COL_STARTCODE, Bytes.toBytes(serverInfo.getStartCode()));
|
||||
server.batchUpdate(metaRegionName, b, -1L);
|
||||
if (!this.historian.isOnline()) {
|
||||
// This is safest place to do the onlining of the historian in
|
||||
|
@ -93,19 +92,24 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
// for the historian to go against.
|
||||
this.historian.online(this.master.getConfiguration());
|
||||
}
|
||||
this.historian.addRegionOpen(regionInfo, serverAddress);
|
||||
this.historian.addRegionOpen(regionInfo, serverInfo.getServerAddress());
|
||||
synchronized (master.regionManager) {
|
||||
if (isMetaTable) {
|
||||
// It's a meta region.
|
||||
MetaRegion m = new MetaRegion(new HServerAddress(serverAddress),
|
||||
MetaRegion m =
|
||||
new MetaRegion(new HServerAddress(serverInfo.getServerAddress()),
|
||||
regionInfo.getRegionName(), regionInfo.getStartKey());
|
||||
if (!master.regionManager.isInitialMetaScanComplete()) {
|
||||
// Put it on the queue to be scanned for the first time.
|
||||
LOG.debug("Adding " + m.toString() + " to regions to scan");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding " + m.toString() + " to regions to scan");
|
||||
}
|
||||
master.regionManager.addMetaRegionToScan(m);
|
||||
} else {
|
||||
// Add it to the online meta regions
|
||||
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
|
||||
}
|
||||
master.regionManager.putMetaRegionOnline(m);
|
||||
// Interrupting the Meta Scanner sleep so that it can
|
||||
// process regions right away
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.Set;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
|
@ -44,11 +43,7 @@ import org.apache.hadoop.hbase.io.RowResult;
|
|||
* serving, and the regions need to get reassigned.
|
||||
*/
|
||||
class ProcessServerShutdown extends RegionServerOperation {
|
||||
private final HServerAddress deadServer;
|
||||
/*
|
||||
* Cache of the server name.
|
||||
*/
|
||||
private final String deadServerStr;
|
||||
private final String deadServer;
|
||||
private final boolean rootRegionServer;
|
||||
private boolean rootRegionReassigned = false;
|
||||
private Path oldLogDir;
|
||||
|
@ -76,8 +71,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
public ProcessServerShutdown(HMaster master, HServerInfo serverInfo,
|
||||
boolean rootRegionServer) {
|
||||
super(master);
|
||||
this.deadServer = serverInfo.getServerAddress();
|
||||
this.deadServerStr = this.deadServer.toString();
|
||||
this.deadServer = HServerInfo.getServerName(serverInfo);
|
||||
this.rootRegionServer = rootRegionServer;
|
||||
this.logSplit = false;
|
||||
this.rootRescanned = false;
|
||||
|
@ -87,7 +81,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProcessServerShutdown of " + this.deadServerStr;
|
||||
return "ProcessServerShutdown of " + this.deadServer;
|
||||
}
|
||||
|
||||
/** Finds regions that the dead region server was serving
|
||||
|
@ -116,8 +110,13 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
// shutdown server but that would mean that we'd reassign regions that
|
||||
// were already out being assigned, ones that were product of a split
|
||||
// that happened while the shutdown was being processed.
|
||||
String serverName = Writables.cellToString(values.get(COL_SERVER));
|
||||
if (serverName == null || !deadServerStr.equals(serverName)) {
|
||||
String serverAddress = Writables.cellToString(values.get(COL_SERVER));
|
||||
long startCode = Writables.cellToLong(values.get(COL_STARTCODE));
|
||||
String serverName = null;
|
||||
if (serverAddress != null && serverAddress.length() > 0) {
|
||||
serverName = HServerInfo.getServerName(serverAddress, startCode);
|
||||
}
|
||||
if (serverName == null || !deadServer.equals(serverName)) {
|
||||
// This isn't the server you're looking for - move along
|
||||
continue;
|
||||
}
|
||||
|
@ -146,7 +145,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
ToDoEntry todo = new ToDoEntry(row, info);
|
||||
toDoList.add(todo);
|
||||
|
||||
if (master.regionManager.isOfflined(info.getRegionName()) ||
|
||||
if (master.regionManager.isOfflined(info.getRegionNameAsString()) ||
|
||||
info.isOffline()) {
|
||||
master.regionManager.removeRegion(info);
|
||||
// Mark region offline
|
||||
|
@ -232,7 +231,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
|
||||
@Override
|
||||
protected boolean process() throws IOException {
|
||||
LOG.info("process shutdown of server " + this.deadServerStr +
|
||||
LOG.info("process shutdown of server " + this.deadServer +
|
||||
": logSplit: " +
|
||||
logSplit + ", rootRescanned: " + rootRescanned +
|
||||
", numberOfMetaRegions: " +
|
||||
|
@ -310,9 +309,9 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
}
|
||||
}
|
||||
// Remove this server from dead servers list. Finished splitting logs.
|
||||
this.master.serverManager.removeDeadServer(deadServerStr);
|
||||
this.master.serverManager.removeDeadServer(deadServer);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed " + deadServerStr + " from deadservers Map");
|
||||
LOG.debug("Removed " + deadServer + " from deadservers Map");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -96,9 +96,8 @@ class RegionManager implements HConstants {
|
|||
*
|
||||
* @see RegionState inner-class below
|
||||
*/
|
||||
private final SortedMap<byte[], RegionState> regionsInTransition =
|
||||
Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[], RegionState>(Bytes.BYTES_COMPARATOR));
|
||||
private final SortedMap<String, RegionState> regionsInTransition =
|
||||
Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
|
||||
|
||||
// How many regions to assign a server at a time.
|
||||
private final int maxAssignInOneGo;
|
||||
|
@ -108,25 +107,25 @@ class RegionManager implements HConstants {
|
|||
private final float slop;
|
||||
|
||||
/** Set of regions to split. */
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToSplit =
|
||||
Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>
|
||||
regionsToSplit = Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
/** Set of regions to compact. */
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToCompact =
|
||||
Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>
|
||||
regionsToCompact = Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
/** Set of regions to major compact. */
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToMajorCompact =
|
||||
Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>
|
||||
regionsToMajorCompact = Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
/** Set of regions to flush. */
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToFlush =
|
||||
Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>>
|
||||
regionsToFlush = Collections.synchronizedSortedMap(
|
||||
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
|
||||
(Bytes.BYTES_COMPARATOR));
|
||||
|
||||
private final ZooKeeperWrapper zooKeeperWrapper;
|
||||
private final int zooKeeperNumRetries;
|
||||
|
@ -163,7 +162,8 @@ class RegionManager implements HConstants {
|
|||
void unsetRootRegion() {
|
||||
synchronized (regionsInTransition) {
|
||||
rootRegionLocation.set(null);
|
||||
regionsInTransition.remove(HRegionInfo.ROOT_REGIONINFO.getRegionName());
|
||||
regionsInTransition.remove(
|
||||
HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -173,7 +173,8 @@ class RegionManager implements HConstants {
|
|||
synchronized (regionsInTransition) {
|
||||
RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO);
|
||||
s.setUnassigned();
|
||||
regionsInTransition.put(HRegionInfo.ROOT_REGIONINFO.getRegionName(), s);
|
||||
regionsInTransition.put(
|
||||
HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -184,11 +185,11 @@ class RegionManager implements HConstants {
|
|||
* (ServerManager.processMsgs) already owns the monitor for the RegionManager.
|
||||
*
|
||||
* @param info
|
||||
* @param serverName
|
||||
* @param mostLoadedRegions
|
||||
* @param returnMsgs
|
||||
*/
|
||||
void assignRegions(HServerInfo info, String serverName,
|
||||
HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
|
||||
void assignRegions(HServerInfo info, HRegionInfo[] mostLoadedRegions,
|
||||
ArrayList<HMsg> returnMsgs) {
|
||||
HServerLoad thisServersLoad = info.getLoad();
|
||||
// figure out what regions need to be assigned and aren't currently being
|
||||
// worked on elsewhere.
|
||||
|
@ -204,24 +205,24 @@ class RegionManager implements HConstants {
|
|||
if (avgLoad > 2.0 &&
|
||||
thisServersLoad.getNumberOfRegions() > avgLoadWithSlop) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Server " + serverName +
|
||||
LOG.debug("Server " + info.getServerName() +
|
||||
" is overloaded. Server load: " +
|
||||
thisServersLoad.getNumberOfRegions() + " avg: " + avgLoad +
|
||||
", slop: " + this.slop);
|
||||
}
|
||||
unassignSomeRegions(serverName, thisServersLoad,
|
||||
unassignSomeRegions(info, thisServersLoad,
|
||||
avgLoad, mostLoadedRegions, returnMsgs);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if there's only one server, just give it all the regions
|
||||
if (master.serverManager.numServers() == 1) {
|
||||
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
||||
assignRegionsToOneServer(regionsToAssign, info, returnMsgs);
|
||||
} else {
|
||||
// otherwise, give this server a few regions taking into account the
|
||||
// load of all the other servers.
|
||||
assignRegionsToMultipleServers(thisServersLoad, regionsToAssign,
|
||||
serverName, returnMsgs);
|
||||
info, returnMsgs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -234,7 +235,7 @@ class RegionManager implements HConstants {
|
|||
* whose caller owns the monitor for RegionManager
|
||||
*/
|
||||
private void assignRegionsToMultipleServers(final HServerLoad thisServersLoad,
|
||||
final Set<RegionState> regionsToAssign, final String serverName,
|
||||
final Set<RegionState> regionsToAssign, final HServerInfo info,
|
||||
final ArrayList<HMsg> returnMsgs) {
|
||||
|
||||
int nRegionsToAssign = regionsToAssign.size();
|
||||
|
@ -281,9 +282,10 @@ class RegionManager implements HConstants {
|
|||
|
||||
for (RegionState s: regionsToAssign) {
|
||||
LOG.info("assigning region " + Bytes.toString(s.getRegionName())+
|
||||
" to server " + serverName);
|
||||
s.setPendingOpen(serverName);
|
||||
this.historian.addRegionAssignment(s.getRegionInfo(), serverName);
|
||||
" to server " + info.getServerName());
|
||||
s.setPendingOpen(info.getServerName());
|
||||
this.historian.addRegionAssignment(s.getRegionInfo(),
|
||||
info.getServerName());
|
||||
returnMsgs.add(
|
||||
new HMsg(HMsg.Type.MSG_REGION_OPEN, s.getRegionInfo(), inSafeMode()));
|
||||
if (--nregions <= 0) {
|
||||
|
@ -406,12 +408,12 @@ class RegionManager implements HConstants {
|
|||
* @param returnMsgs
|
||||
*/
|
||||
private void assignRegionsToOneServer(final Set<RegionState> regionsToAssign,
|
||||
final String serverName, final ArrayList<HMsg> returnMsgs) {
|
||||
final HServerInfo info, final ArrayList<HMsg> returnMsgs) {
|
||||
for (RegionState s: regionsToAssign) {
|
||||
LOG.info("assigning region " + Bytes.toString(s.getRegionName()) +
|
||||
" to the only server " + serverName);
|
||||
s.setPendingOpen(serverName);
|
||||
this.historian.addRegionAssignment(s.getRegionInfo(), serverName);
|
||||
" to the only server " + info.getServerName());
|
||||
s.setPendingOpen(info.getServerName());
|
||||
this.historian.addRegionAssignment(s.getRegionInfo(), info.getServerName());
|
||||
returnMsgs.add(
|
||||
new HMsg(HMsg.Type.MSG_REGION_OPEN, s.getRegionInfo(), inSafeMode()));
|
||||
}
|
||||
|
@ -425,7 +427,7 @@ class RegionManager implements HConstants {
|
|||
* Note that no synchronization is needed because the only caller
|
||||
* (assignRegions) whose caller owns the monitor for RegionManager
|
||||
*/
|
||||
private void unassignSomeRegions(final String serverName,
|
||||
private void unassignSomeRegions(final HServerInfo info,
|
||||
final HServerLoad load, final double avgLoad,
|
||||
final HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
|
||||
int numRegionsToClose = load.getNumberOfRegions() - (int)Math.ceil(avgLoad);
|
||||
|
@ -443,19 +445,20 @@ class RegionManager implements HConstants {
|
|||
if (currentRegion.isRootRegion() || currentRegion.isMetaTable()) {
|
||||
continue;
|
||||
}
|
||||
byte[] regionName = currentRegion.getRegionName();
|
||||
String regionName = currentRegion.getRegionNameAsString();
|
||||
if (regionIsInTransition(regionName)) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
LOG.debug("Going to close region " +
|
||||
currentRegion.getRegionNameAsString());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Going to close region " + regionName);
|
||||
}
|
||||
// make a message to close the region
|
||||
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, currentRegion,
|
||||
OVERLOADED, inSafeMode()));
|
||||
// mark the region as closing
|
||||
setClosing(serverName, currentRegion, false);
|
||||
setPendingClose(currentRegion.getRegionName());
|
||||
setClosing(info.getServerName(), currentRegion, false);
|
||||
setPendingClose(regionName);
|
||||
// increment the count of regions we've marked
|
||||
regionsClosed++;
|
||||
}
|
||||
|
@ -716,14 +719,14 @@ class RegionManager implements HConstants {
|
|||
* @param info
|
||||
*/
|
||||
public void removeRegion(HRegionInfo info) {
|
||||
regionsInTransition.remove(info.getRegionName());
|
||||
regionsInTransition.remove(info.getRegionNameAsString());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param regionName
|
||||
* @return true if the named region is in a transition state
|
||||
*/
|
||||
public boolean regionIsInTransition(byte[] regionName) {
|
||||
public boolean regionIsInTransition(String regionName) {
|
||||
return regionsInTransition.containsKey(regionName);
|
||||
}
|
||||
|
||||
|
@ -731,7 +734,7 @@ class RegionManager implements HConstants {
|
|||
* @param regionName
|
||||
* @return true if the region is unassigned, pendingOpen or open
|
||||
*/
|
||||
public boolean regionIsOpening(byte[] regionName) {
|
||||
public boolean regionIsOpening(String regionName) {
|
||||
RegionState state = regionsInTransition.get(regionName);
|
||||
if (state != null) {
|
||||
return state.isOpening();
|
||||
|
@ -746,10 +749,10 @@ class RegionManager implements HConstants {
|
|||
*/
|
||||
public void setUnassigned(HRegionInfo info, boolean force) {
|
||||
synchronized(this.regionsInTransition) {
|
||||
RegionState s = regionsInTransition.get(info.getRegionName());
|
||||
RegionState s = regionsInTransition.get(info.getRegionNameAsString());
|
||||
if (s == null) {
|
||||
s = new RegionState(info);
|
||||
regionsInTransition.put(info.getRegionName(), s);
|
||||
regionsInTransition.put(info.getRegionNameAsString(), s);
|
||||
}
|
||||
if (force || (!s.isPendingOpen() && !s.isOpen())) {
|
||||
s.setUnassigned();
|
||||
|
@ -766,7 +769,7 @@ class RegionManager implements HConstants {
|
|||
*/
|
||||
public boolean isUnassigned(HRegionInfo info) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = regionsInTransition.get(info.getRegionName());
|
||||
RegionState s = regionsInTransition.get(info.getRegionNameAsString());
|
||||
if (s != null) {
|
||||
return s.isUnassigned();
|
||||
}
|
||||
|
@ -781,7 +784,7 @@ class RegionManager implements HConstants {
|
|||
* @param regionName name of the region
|
||||
* @return true if open, false otherwise
|
||||
*/
|
||||
public boolean isPendingOpen(byte [] regionName) {
|
||||
public boolean isPendingOpen(String regionName) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = regionsInTransition.get(regionName);
|
||||
if (s != null) {
|
||||
|
@ -795,7 +798,7 @@ class RegionManager implements HConstants {
|
|||
* Region has been assigned to a server and the server has told us it is open
|
||||
* @param regionName
|
||||
*/
|
||||
public void setOpen(byte [] regionName) {
|
||||
public void setOpen(String regionName) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = regionsInTransition.get(regionName);
|
||||
if (s != null) {
|
||||
|
@ -808,7 +811,7 @@ class RegionManager implements HConstants {
|
|||
* @param regionName
|
||||
* @return true if region is marked to be offlined.
|
||||
*/
|
||||
public boolean isOfflined(byte[] regionName) {
|
||||
public boolean isOfflined(String regionName) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = regionsInTransition.get(regionName);
|
||||
if (s != null) {
|
||||
|
@ -827,12 +830,13 @@ class RegionManager implements HConstants {
|
|||
public void setClosing(final String serverName, final HRegionInfo regionInfo,
|
||||
final boolean setOffline) {
|
||||
synchronized (this.regionsInTransition) {
|
||||
RegionState s = this.regionsInTransition.get(regionInfo.getRegionName());
|
||||
RegionState s =
|
||||
this.regionsInTransition.get(regionInfo.getRegionNameAsString());
|
||||
if (s == null) {
|
||||
s = new RegionState(regionInfo);
|
||||
}
|
||||
s.setClosing(serverName, setOffline);
|
||||
this.regionsInTransition.put(regionInfo.getRegionName(), s);
|
||||
this.regionsInTransition.put(regionInfo.getRegionNameAsString(), s);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -861,7 +865,7 @@ class RegionManager implements HConstants {
|
|||
*
|
||||
* @param regionName
|
||||
*/
|
||||
public void setPendingClose(byte[] regionName) {
|
||||
public void setPendingClose(String regionName) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = regionsInTransition.get(regionName);
|
||||
if (s != null) {
|
||||
|
@ -873,7 +877,7 @@ class RegionManager implements HConstants {
|
|||
/**
|
||||
* @param regionName
|
||||
*/
|
||||
public void setClosed(byte[] regionName) {
|
||||
public void setClosed(String regionName) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = regionsInTransition.get(regionName);
|
||||
if (s != null) {
|
||||
|
@ -1117,8 +1121,7 @@ class RegionManager implements HConstants {
|
|||
SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> map,
|
||||
final HMsg.Type msg) {
|
||||
HServerAddress addr = serverInfo.getServerAddress();
|
||||
Iterator<Pair<HRegionInfo, HServerAddress>> i =
|
||||
map.values().iterator();
|
||||
Iterator<Pair<HRegionInfo, HServerAddress>> i = map.values().iterator();
|
||||
synchronized (map) {
|
||||
while (i.hasNext()) {
|
||||
Pair<HRegionInfo,HServerAddress> pair = i.next();
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.HMsg;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.Leases;
|
||||
import org.apache.hadoop.hbase.HMsg.Type;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
|
@ -71,11 +71,13 @@ class ServerManager implements HConstants {
|
|||
|
||||
/**
|
||||
* Set of known dead servers. On znode expiration, servers are added here.
|
||||
* Boolean holds whether its logs have been split or not. Initially set to
|
||||
* false.
|
||||
* This is needed in case of a network partitioning where the server's lease
|
||||
* expires, but the server is still running. After the network is healed,
|
||||
* and it's server logs are recovered, it will be told to call server startup
|
||||
* because by then, its regions have probably been reassigned.
|
||||
*/
|
||||
private final Map<String, Boolean> deadServers =
|
||||
new ConcurrentHashMap<String, Boolean>();
|
||||
private final Set<String> deadServers =
|
||||
Collections.synchronizedSet(new HashSet<String>());
|
||||
|
||||
/** SortedMap server load -> Set of server names */
|
||||
final SortedMap<HServerLoad, Set<String>> loadToServers =
|
||||
|
@ -108,64 +110,37 @@ class ServerManager implements HConstants {
|
|||
getInt("hbase.regions.nobalancing.count", 4);
|
||||
}
|
||||
|
||||
/**
|
||||
* Look to see if we have ghost references to this regionserver such as
|
||||
* if regionserver is on the dead servers list getting its logs processed.
|
||||
* @param serverInfo
|
||||
* @return True if still ghost references and we have not been able to clear
|
||||
* them or the server is shutting down.
|
||||
*/
|
||||
private boolean checkForGhostReferences(final HServerInfo serverInfo) {
|
||||
boolean result = false;
|
||||
for (long sleepTime = -1; !master.closed.get() && !result;) {
|
||||
if (sleepTime != -1) {
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
// Continue
|
||||
}
|
||||
}
|
||||
// May be on list of dead servers. If so, wait till we've cleared it.
|
||||
String addr = serverInfo.getServerAddress().toString();
|
||||
if (isDead(addr)) {
|
||||
LOG.debug("Waiting on " + addr + " removal from dead list before " +
|
||||
"processing report-for-duty request");
|
||||
sleepTime = this.master.threadWakeFrequency;
|
||||
continue;
|
||||
}
|
||||
result = true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Let the server manager know a new regionserver has come online
|
||||
* @param serverInfo
|
||||
* @throws Leases.LeaseStillHeldException
|
||||
*/
|
||||
public void regionServerStartup(final HServerInfo serverInfo) {
|
||||
String s = serverInfo.getServerAddress().toString().trim();
|
||||
Watcher watcher = new ServerExpirer(serverInfo.getServerAddress()
|
||||
.toString().trim());
|
||||
zooKeeperWrapper.updateRSLocationGetWatch(serverInfo, watcher);
|
||||
|
||||
LOG.info("Received start message from: " + s);
|
||||
if (!checkForGhostReferences(serverInfo)) {
|
||||
return;
|
||||
public void regionServerStartup(final HServerInfo serverInfo)
|
||||
throws Leases.LeaseStillHeldException {
|
||||
HServerInfo info = new HServerInfo(serverInfo);
|
||||
String serverName = HServerInfo.getServerName(info);
|
||||
if (serversToServerInfo.containsKey(serverName) ||
|
||||
deadServers.contains(serverName)) {
|
||||
throw new Leases.LeaseStillHeldException(serverName);
|
||||
}
|
||||
Watcher watcher = new ServerExpirer(serverName);
|
||||
zooKeeperWrapper.updateRSLocationGetWatch(info, watcher);
|
||||
|
||||
LOG.info("Received start message from: " + serverName);
|
||||
// Go on to process the regionserver registration.
|
||||
HServerLoad load = serversToLoad.remove(s);
|
||||
HServerLoad load = serversToLoad.remove(serverName);
|
||||
if (load != null) {
|
||||
// The startup message was from a known server.
|
||||
// Remove stale information about the server's load.
|
||||
synchronized (loadToServers) {
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers != null) {
|
||||
servers.remove(s);
|
||||
servers.remove(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
}
|
||||
}
|
||||
HServerInfo storedInfo = serversToServerInfo.remove(s);
|
||||
HServerInfo storedInfo = serversToServerInfo.remove(serverName);
|
||||
if (storedInfo != null && !master.closed.get()) {
|
||||
// The startup message was from a known server with the same name.
|
||||
// Timeout the old one right away.
|
||||
|
@ -184,15 +159,15 @@ class ServerManager implements HConstants {
|
|||
}
|
||||
// record new server
|
||||
load = new HServerLoad();
|
||||
serverInfo.setLoad(load);
|
||||
serversToServerInfo.put(s, serverInfo);
|
||||
serversToLoad.put(s, load);
|
||||
info.setLoad(load);
|
||||
serversToServerInfo.put(serverName, info);
|
||||
serversToLoad.put(serverName, load);
|
||||
synchronized (loadToServers) {
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers == null) {
|
||||
servers = new HashSet<String>();
|
||||
}
|
||||
servers.add(s);
|
||||
servers.add(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
}
|
||||
|
@ -213,13 +188,16 @@ class ServerManager implements HConstants {
|
|||
public HMsg [] regionServerReport(final HServerInfo serverInfo,
|
||||
final HMsg msgs[], final HRegionInfo[] mostLoadedRegions)
|
||||
throws IOException {
|
||||
String serverName = serverInfo.getServerAddress().toString().trim();
|
||||
HServerInfo info = new HServerInfo(serverInfo);
|
||||
if (isDead(info.getServerName())) {
|
||||
throw new Leases.LeaseStillHeldException(info.getServerName());
|
||||
}
|
||||
if (msgs.length > 0) {
|
||||
if (msgs[0].isType(HMsg.Type.MSG_REPORT_EXITING)) {
|
||||
processRegionServerExit(serverName, msgs);
|
||||
processRegionServerExit(info, msgs);
|
||||
return EMPTY_HMSG_ARRAY;
|
||||
} else if (msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) {
|
||||
LOG.info("Region server " + serverName + " quiesced");
|
||||
LOG.info("Region server " + info.getServerName() + " quiesced");
|
||||
quiescedServers.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
@ -251,16 +229,17 @@ class ServerManager implements HConstants {
|
|||
return new HMsg [] {REGIONSERVER_STOP};
|
||||
}
|
||||
|
||||
HServerInfo storedInfo = serversToServerInfo.get(serverName);
|
||||
HServerInfo storedInfo = serversToServerInfo.get(info.getServerName());
|
||||
if (storedInfo == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("received server report from unknown server: " + serverName);
|
||||
LOG.debug("received server report from unknown server: " +
|
||||
info.getServerName());
|
||||
}
|
||||
|
||||
// The HBaseMaster may have been restarted.
|
||||
// Tell the RegionServer to start over and call regionServerStartup()
|
||||
return new HMsg[]{CALL_SERVER_STARTUP};
|
||||
} else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
|
||||
} else if (storedInfo.getStartCode() != info.getStartCode()) {
|
||||
// This state is reachable if:
|
||||
//
|
||||
// 1) RegionServer A started
|
||||
|
@ -271,35 +250,37 @@ class ServerManager implements HConstants {
|
|||
// The answer is to ask A to shut down for good.
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("region server race condition detected: " + serverName);
|
||||
LOG.debug("region server race condition detected: " +
|
||||
info.getServerName());
|
||||
}
|
||||
|
||||
synchronized (serversToServerInfo) {
|
||||
removeServerInfo(serverName);
|
||||
removeServerInfo(info.getServerName());
|
||||
serversToServerInfo.notifyAll();
|
||||
}
|
||||
|
||||
return new HMsg[]{REGIONSERVER_STOP};
|
||||
} else {
|
||||
return processRegionServerAllsWell(serverName, serverInfo,
|
||||
mostLoadedRegions, msgs);
|
||||
return processRegionServerAllsWell(info, mostLoadedRegions, msgs);
|
||||
}
|
||||
}
|
||||
|
||||
/** Region server is exiting */
|
||||
private void processRegionServerExit(String serverName, HMsg[] msgs) {
|
||||
private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) {
|
||||
synchronized (serversToServerInfo) {
|
||||
try {
|
||||
// HRegionServer is shutting down.
|
||||
if (removeServerInfo(serverName)) {
|
||||
if (removeServerInfo(serverInfo.getServerName())) {
|
||||
// Only process the exit message if the server still has registered info.
|
||||
// Otherwise we could end up processing the server exit twice.
|
||||
LOG.info("Region server " + serverName + ": MSG_REPORT_EXITING");
|
||||
LOG.info("Region server " + serverInfo.getServerName() +
|
||||
": MSG_REPORT_EXITING");
|
||||
// Get all the regions the server was serving reassigned
|
||||
// (if we are not shutting down).
|
||||
if (!master.closed.get()) {
|
||||
for (int i = 1; i < msgs.length; i++) {
|
||||
LOG.info("Processing " + msgs[i] + " from " + serverName);
|
||||
LOG.info("Processing " + msgs[i] + " from " +
|
||||
serverInfo.getServerName());
|
||||
HRegionInfo info = msgs[i].getRegionInfo();
|
||||
synchronized (master.regionManager) {
|
||||
if (info.isRootRegion()) {
|
||||
|
@ -308,7 +289,8 @@ class ServerManager implements HConstants {
|
|||
if (info.isMetaTable()) {
|
||||
master.regionManager.offlineMetaRegion(info.getStartKey());
|
||||
}
|
||||
if (!master.regionManager.isOfflined(info.getRegionName())) {
|
||||
if (!master.regionManager.isOfflined(
|
||||
info.getRegionNameAsString())) {
|
||||
master.regionManager.setUnassigned(info, true);
|
||||
} else {
|
||||
master.regionManager.removeRegion(info);
|
||||
|
@ -328,21 +310,19 @@ class ServerManager implements HConstants {
|
|||
|
||||
/**
|
||||
* RegionServer is checking in, no exceptional circumstances
|
||||
* @param serverName
|
||||
* @param serverInfo
|
||||
* @param mostLoadedRegions
|
||||
* @param msgs
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private HMsg[] processRegionServerAllsWell(String serverName,
|
||||
HServerInfo serverInfo, HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
|
||||
private HMsg[] processRegionServerAllsWell(HServerInfo serverInfo, HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
|
||||
throws IOException {
|
||||
|
||||
// Refresh the info object and the load information
|
||||
serversToServerInfo.put(serverName, serverInfo);
|
||||
serversToServerInfo.put(serverInfo.getServerName(), serverInfo);
|
||||
|
||||
HServerLoad load = serversToLoad.get(serverName);
|
||||
HServerLoad load = serversToLoad.get(serverInfo.getServerName());
|
||||
if (load != null) {
|
||||
this.master.getMetrics().incrementRequests(load.getNumberOfRequests());
|
||||
if (!load.equals(serverInfo.getLoad())) {
|
||||
|
@ -352,7 +332,7 @@ class ServerManager implements HConstants {
|
|||
Set<String> servers = loadToServers.get(load);
|
||||
// Note that servers should never be null because loadToServers
|
||||
// and serversToLoad are manipulated in pairs
|
||||
servers.remove(serverName);
|
||||
servers.remove(serverInfo.getServerName());
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
}
|
||||
|
@ -360,18 +340,18 @@ class ServerManager implements HConstants {
|
|||
|
||||
// Set the current load information
|
||||
load = serverInfo.getLoad();
|
||||
serversToLoad.put(serverName, load);
|
||||
serversToLoad.put(serverInfo.getServerName(), load);
|
||||
synchronized (loadToServers) {
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers == null) {
|
||||
servers = new HashSet<String>();
|
||||
}
|
||||
servers.add(serverName);
|
||||
servers.add(serverInfo.getServerName());
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
|
||||
// Next, process messages for this server
|
||||
return processMsgs(serverName, serverInfo, mostLoadedRegions, msgs);
|
||||
return processMsgs(serverInfo, mostLoadedRegions, msgs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -380,8 +360,8 @@ class ServerManager implements HConstants {
|
|||
* Note that we never need to update the server's load information because
|
||||
* that has already been done in regionServerReport.
|
||||
*/
|
||||
private HMsg[] processMsgs(String serverName, HServerInfo serverInfo,
|
||||
HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[])
|
||||
private HMsg[] processMsgs(HServerInfo serverInfo,
|
||||
HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[])
|
||||
throws IOException {
|
||||
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
|
||||
if (serverInfo.getServerAddress() == null) {
|
||||
|
@ -392,14 +372,15 @@ class ServerManager implements HConstants {
|
|||
int openingCount = 0;
|
||||
for (int i = 0; i < incomingMsgs.length; i++) {
|
||||
HRegionInfo region = incomingMsgs[i].getRegionInfo();
|
||||
LOG.info("Received " + incomingMsgs[i] + " from " + serverName);
|
||||
LOG.info("Received " + incomingMsgs[i] + " from " +
|
||||
serverInfo.getServerName());
|
||||
switch (incomingMsgs[i].getType()) {
|
||||
case MSG_REPORT_PROCESS_OPEN:
|
||||
openingCount++;
|
||||
break;
|
||||
|
||||
case MSG_REPORT_OPEN:
|
||||
processRegionOpen(serverName, serverInfo, region, returnMsgs);
|
||||
processRegionOpen(serverInfo, region, returnMsgs);
|
||||
break;
|
||||
|
||||
case MSG_REPORT_CLOSE:
|
||||
|
@ -407,8 +388,8 @@ class ServerManager implements HConstants {
|
|||
break;
|
||||
|
||||
case MSG_REPORT_SPLIT:
|
||||
processSplitRegion(serverName, serverInfo, region, incomingMsgs[++i],
|
||||
incomingMsgs[++i], returnMsgs);
|
||||
processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i],
|
||||
returnMsgs);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -420,11 +401,12 @@ class ServerManager implements HConstants {
|
|||
|
||||
synchronized (master.regionManager) {
|
||||
// Tell the region server to close regions that we have marked for closing.
|
||||
for (HRegionInfo i: master.regionManager.getMarkedToClose(serverName)) {
|
||||
for (HRegionInfo i:
|
||||
master.regionManager.getMarkedToClose(serverInfo.getServerName())) {
|
||||
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, i,
|
||||
master.regionManager.inSafeMode()));
|
||||
// Transition the region from toClose to closing state
|
||||
master.regionManager.setPendingClose(i.getRegionName());
|
||||
master.regionManager.setPendingClose(i.getRegionNameAsString());
|
||||
}
|
||||
|
||||
// Figure out what the RegionServer ought to do, and write back.
|
||||
|
@ -432,8 +414,8 @@ class ServerManager implements HConstants {
|
|||
// Should we tell it close regions because its overloaded? If its
|
||||
// currently opening regions, leave it alone till all are open.
|
||||
if (openingCount < this.nobalancingCount) {
|
||||
this.master.regionManager.assignRegions(serverInfo, serverName,
|
||||
mostLoadedRegions, returnMsgs);
|
||||
this.master.regionManager.assignRegions(serverInfo, mostLoadedRegions,
|
||||
returnMsgs);
|
||||
}
|
||||
// Send any pending table actions.
|
||||
this.master.regionManager.applyActions(serverInfo, returnMsgs);
|
||||
|
@ -444,15 +426,13 @@ class ServerManager implements HConstants {
|
|||
/**
|
||||
* A region has split.
|
||||
*
|
||||
* @param serverName
|
||||
* @param serverInfo
|
||||
* @param region
|
||||
* @param splitA
|
||||
* @param splitB
|
||||
* @param returnMsgs
|
||||
*/
|
||||
private void processSplitRegion(String serverName, HServerInfo serverInfo,
|
||||
HRegionInfo region, HMsg splitA, HMsg splitB, ArrayList<HMsg> returnMsgs) {
|
||||
private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB,
|
||||
ArrayList<HMsg> returnMsgs) {
|
||||
|
||||
synchronized (master.regionManager) {
|
||||
// Cancel any actions pending for the affected region.
|
||||
|
@ -475,18 +455,18 @@ class ServerManager implements HConstants {
|
|||
}
|
||||
|
||||
/** Region server is reporting that a region is now opened */
|
||||
private void processRegionOpen(String serverName, HServerInfo serverInfo,
|
||||
private void processRegionOpen(HServerInfo serverInfo,
|
||||
HRegionInfo region, ArrayList<HMsg> returnMsgs)
|
||||
throws IOException {
|
||||
boolean duplicateAssignment = false;
|
||||
synchronized (master.regionManager) {
|
||||
if (!master.regionManager.isUnassigned(region) &&
|
||||
!master.regionManager.isPendingOpen(region.getRegionName())) {
|
||||
!master.regionManager.isPendingOpen(region.getRegionNameAsString())) {
|
||||
if (region.isRootRegion()) {
|
||||
// Root region
|
||||
HServerAddress rootServer = master.getRootRegionLocation();
|
||||
if (rootServer != null) {
|
||||
if (rootServer.toString().compareTo(serverName) == 0) {
|
||||
if (rootServer.compareTo(serverInfo.getServerAddress()) == 0) {
|
||||
// A duplicate open report from the correct server
|
||||
return;
|
||||
}
|
||||
|
@ -498,7 +478,8 @@ class ServerManager implements HConstants {
|
|||
// Not root region. If it is not a pending region, then we are
|
||||
// going to treat it as a duplicate assignment, although we can't
|
||||
// tell for certain that's the case.
|
||||
if (master.regionManager.isPendingOpen(region.getRegionName())) {
|
||||
if (master.regionManager.isPendingOpen(
|
||||
region.getRegionNameAsString())) {
|
||||
// A duplicate report from the correct server
|
||||
return;
|
||||
}
|
||||
|
@ -535,7 +516,7 @@ class ServerManager implements HConstants {
|
|||
} else {
|
||||
// Note that the table has been assigned and is waiting for the
|
||||
// meta table to be updated.
|
||||
master.regionManager.setOpen(region.getRegionName());
|
||||
master.regionManager.setOpen(region.getRegionNameAsString());
|
||||
// Queue up an update to note the region location.
|
||||
try {
|
||||
master.toDoQueue.put(
|
||||
|
@ -567,7 +548,7 @@ class ServerManager implements HConstants {
|
|||
}
|
||||
|
||||
boolean offlineRegion =
|
||||
master.regionManager.isOfflined(region.getRegionName());
|
||||
master.regionManager.isOfflined(region.getRegionNameAsString());
|
||||
boolean reassignRegion = !region.isOffline() && !offlineRegion;
|
||||
|
||||
// NOTE: If the region was just being closed and not offlined, we cannot
|
||||
|
@ -575,7 +556,7 @@ class ServerManager implements HConstants {
|
|||
// the messages we've received. In this case, a close could be
|
||||
// processed before an open resulting in the master not agreeing on
|
||||
// the region's state.
|
||||
master.regionManager.setClosed(region.getRegionName());
|
||||
master.regionManager.setClosed(region.getRegionNameAsString());
|
||||
try {
|
||||
master.toDoQueue.put(new ProcessRegionClose(master, region,
|
||||
offlineRegion, reassignRegion));
|
||||
|
@ -648,11 +629,11 @@ class ServerManager implements HConstants {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param address server address
|
||||
* @param name server name
|
||||
* @return HServerInfo for the given server address
|
||||
*/
|
||||
public HServerInfo getServerInfo(String address) {
|
||||
return serversToServerInfo.get(address);
|
||||
public HServerInfo getServerInfo(String name) {
|
||||
return serversToServerInfo.get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -660,7 +641,7 @@ class ServerManager implements HConstants {
|
|||
*/
|
||||
public Map<String, HServerInfo> getServersToServerInfo() {
|
||||
synchronized (serversToServerInfo) {
|
||||
return new HashMap<String, HServerInfo>(serversToServerInfo);
|
||||
return Collections.unmodifiableMap(serversToServerInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -669,16 +650,7 @@ class ServerManager implements HConstants {
|
|||
*/
|
||||
public Map<String, HServerLoad> getServersToLoad() {
|
||||
synchronized (serversToLoad) {
|
||||
return new HashMap<String, HServerLoad>(serversToLoad);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only map of load to servers.
|
||||
*/
|
||||
public Map<HServerLoad, Set<String>> getLoadToServers() {
|
||||
synchronized (loadToServers) {
|
||||
return new HashMap<HServerLoad, Set<String>>(loadToServers);
|
||||
return Collections.unmodifiableMap(serversToLoad);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -740,7 +712,7 @@ class ServerManager implements HConstants {
|
|||
master.regionManager.unsetRootRegion();
|
||||
rootServer = true;
|
||||
}
|
||||
String serverName = info.getServerAddress().toString();
|
||||
String serverName = HServerInfo.getServerName(info);
|
||||
HServerLoad load = serversToLoad.remove(serverName);
|
||||
if (load != null) {
|
||||
synchronized (loadToServers) {
|
||||
|
@ -751,7 +723,7 @@ class ServerManager implements HConstants {
|
|||
}
|
||||
}
|
||||
}
|
||||
deadServers.put(server, Boolean.FALSE);
|
||||
deadServers.add(server);
|
||||
try {
|
||||
master.toDoQueue.put(new ProcessServerShutdown(master, info,
|
||||
rootServer));
|
||||
|
@ -778,6 +750,6 @@ class ServerManager implements HConstants {
|
|||
* @return true if server is dead
|
||||
*/
|
||||
public boolean isDead(String serverName) {
|
||||
return deadServers.containsKey(serverName);
|
||||
return deadServers.contains(serverName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,8 +41,8 @@ class TableDelete extends TableOperation {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void processScanItem(String serverName,
|
||||
long startCode, final HRegionInfo info) throws IOException {
|
||||
protected void processScanItem(@SuppressWarnings("unused") String serverName,
|
||||
final HRegionInfo info) throws IOException {
|
||||
|
||||
if (isEnabled(info)) {
|
||||
throw new TableNotDisabledException(tableName);
|
||||
|
|
|
@ -92,17 +92,21 @@ abstract class TableOperation implements HConstants {
|
|||
LOG.error(COL_REGIONINFO + " not found on " + values.getRow());
|
||||
continue;
|
||||
}
|
||||
String serverName = Writables.cellToString(values.get(COL_SERVER));
|
||||
long startCode = Writables.cellToLong(values.get(COL_STARTCODE));
|
||||
String serverAddress = Writables.cellToString(values.get(COL_SERVER));
|
||||
long startCode = Writables.cellToLong(values.get(COL_STARTCODE));
|
||||
String serverName = null;
|
||||
if (serverAddress != null && serverAddress.length() > 0) {
|
||||
serverName = HServerInfo.getServerName(serverAddress, startCode);
|
||||
}
|
||||
if (Bytes.compareTo(info.getTableDesc().getName(), tableName) > 0) {
|
||||
break; // Beyond any more entries for this table
|
||||
}
|
||||
|
||||
tableExists = true;
|
||||
if (!isBeingServed(serverName, startCode) || !isEnabled(info)) {
|
||||
if (!isBeingServed(serverName) || !isEnabled(info)) {
|
||||
unservedRegions.add(info);
|
||||
}
|
||||
processScanItem(serverName, startCode, info);
|
||||
processScanItem(serverName, info);
|
||||
}
|
||||
} finally {
|
||||
if (scannerId != -1L) {
|
||||
|
@ -145,11 +149,11 @@ abstract class TableOperation implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
protected boolean isBeingServed(String serverName, long startCode) {
|
||||
protected boolean isBeingServed(String serverName) {
|
||||
boolean result = false;
|
||||
if (serverName != null && serverName.length() > 0 && startCode != -1L) {
|
||||
if (serverName != null && serverName.length() > 0) {
|
||||
HServerInfo s = master.serverManager.getServerInfo(serverName);
|
||||
result = s != null && s.getStartCode() == startCode;
|
||||
result = s != null;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -158,8 +162,8 @@ abstract class TableOperation implements HConstants {
|
|||
return !info.isOffline();
|
||||
}
|
||||
|
||||
protected abstract void processScanItem(String serverName, long startCode,
|
||||
HRegionInfo info) throws IOException;
|
||||
protected abstract void processScanItem(String serverName, HRegionInfo info)
|
||||
throws IOException;
|
||||
|
||||
protected abstract void postProcessMeta(MetaRegion m,
|
||||
HRegionInterface server) throws IOException;
|
||||
|
|
|
@ -22,8 +22,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
|
@ -843,18 +841,35 @@ public class HLog implements HConstants, Syncable {
|
|||
* @return the HLog directory name
|
||||
*/
|
||||
public static String getHLogDirectoryName(HServerInfo info) {
|
||||
StringBuilder dirName = new StringBuilder("log_");
|
||||
try {
|
||||
dirName.append(URLEncoder.encode(
|
||||
info.getServerAddress().getBindAddress(), UTF8_ENCODING));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
LOG.error("Error encoding '" + info.getServerAddress().getBindAddress()
|
||||
+ "'", e);
|
||||
return getHLogDirectoryName(HServerInfo.getServerName(info));
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct the HLog directory name
|
||||
*
|
||||
* @param serverAddress
|
||||
* @param startCode
|
||||
* @return the HLog directory name
|
||||
*/
|
||||
public static String getHLogDirectoryName(String serverAddress,
|
||||
long startCode) {
|
||||
if (serverAddress == null || serverAddress.length() == 0) {
|
||||
return null;
|
||||
}
|
||||
return getHLogDirectoryName(
|
||||
HServerInfo.getServerName(serverAddress, startCode));
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct the HLog directory name
|
||||
*
|
||||
* @param serverName
|
||||
* @return the HLog directory name
|
||||
*/
|
||||
public static String getHLogDirectoryName(String serverName) {
|
||||
StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
|
||||
dirName.append("_");
|
||||
dirName.append(info.getStartCode());
|
||||
dirName.append("_");
|
||||
dirName.append(info.getServerAddress().getPort());
|
||||
dirName.append(serverName);
|
||||
return dirName.toString();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue