HBASE-457 Factor Master into Master, RegionManager, and ServerManager
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@630545 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
30e6c8c070
commit
1635b75918
|
@ -542,19 +542,23 @@ public class HConnectionManager implements HConstants {
|
|||
HRegionLocation possibleRegion =
|
||||
matchingRegions.get(matchingRegions.lastKey());
|
||||
|
||||
Text endKey = possibleRegion.getRegionInfo().getEndKey();
|
||||
// there is a possibility that the reference was garbage collected
|
||||
// in the instant since we checked isEmpty().
|
||||
if (possibleRegion != null) {
|
||||
Text endKey = possibleRegion.getRegionInfo().getEndKey();
|
||||
|
||||
// make sure that the end key is greater than the row we're looking
|
||||
// for, otherwise the row actually belongs in the next region, not
|
||||
// this one. the exception case is when the endkey is EMPTY_START_ROW,
|
||||
// signifying that the region we're checking is actually the last
|
||||
// region in the table.
|
||||
if (endKey.equals(EMPTY_TEXT) || endKey.compareTo(row) > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found possible location for " + row + ", " +
|
||||
possibleRegion);
|
||||
// make sure that the end key is greater than the row we're looking
|
||||
// for, otherwise the row actually belongs in the next region, not
|
||||
// this one. the exception case is when the endkey is EMPTY_START_ROW,
|
||||
// signifying that the region we're checking is actually the last
|
||||
// region in the table.
|
||||
if (endKey.equals(EMPTY_TEXT) || endKey.compareTo(row) > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found possible location for " + row + ", " +
|
||||
possibleRegion);
|
||||
}
|
||||
return possibleRegion;
|
||||
}
|
||||
return possibleRegion;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,15 +101,28 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
|
||||
protected final boolean rootRegion;
|
||||
protected final HMaster master;
|
||||
protected final RegionManager regionManager;
|
||||
|
||||
protected boolean initialScanComplete;
|
||||
|
||||
protected abstract boolean initialScan();
|
||||
protected abstract void maintenanceScan();
|
||||
|
||||
BaseScanner(final HMaster master, final boolean rootRegion, final int period,
|
||||
final AtomicBoolean stop) {
|
||||
// will use this variable to synchronize and make sure we aren't interrupted
|
||||
// mid-scan
|
||||
final Integer scannerLock = new Integer(0);
|
||||
|
||||
BaseScanner(final HMaster master, final RegionManager regionManager,
|
||||
final boolean rootRegion, final int period, final AtomicBoolean stop) {
|
||||
super(period, stop);
|
||||
this.rootRegion = rootRegion;
|
||||
this.master = master;
|
||||
this.regionManager = regionManager;
|
||||
this.initialScanComplete = false;
|
||||
}
|
||||
|
||||
public boolean isInitialScanComplete() {
|
||||
return initialScanComplete;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,8 +186,8 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
}
|
||||
numberOfRegionsFound += 1;
|
||||
}
|
||||
if (this.rootRegion) {
|
||||
master.numberOfMetaRegions.set(numberOfRegionsFound);
|
||||
if (rootRegion) {
|
||||
regionManager.setNumMetaRegions(numberOfRegionsFound);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
|
@ -328,34 +341,32 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
}
|
||||
|
||||
protected void checkAssigned(final HRegionInfo info,
|
||||
final String serverName, final long startCode) throws IOException {
|
||||
final String serverName, final long startCode)
|
||||
throws IOException {
|
||||
|
||||
// Skip region - if ...
|
||||
if(info.isOffline() // offline
|
||||
|| master.killedRegions.contains(info.getRegionName()) // queued for offline
|
||||
|| master.regionsToDelete.contains(info.getRegionName())) { // queued for delete
|
||||
|| regionManager.isClosing(info.getRegionName()) // queued for offline
|
||||
|| regionManager.isMarkedForDeletion(info.getRegionName())) { // queued for delete
|
||||
|
||||
master.unassignedRegions.remove(info);
|
||||
regionManager.noLongerUnassigned(info);
|
||||
return;
|
||||
}
|
||||
HServerInfo storedInfo = null;
|
||||
boolean deadServer = false;
|
||||
if (serverName.length() != 0) {
|
||||
synchronized (master.killList) {
|
||||
Map<Text, HRegionInfo> regionsToKill = master.killList.get(serverName);
|
||||
if (regionsToKill != null &&
|
||||
regionsToKill.containsKey(info.getRegionName())) {
|
||||
|
||||
// Skip if region is on kill list
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("not assigning region (on kill list): " +
|
||||
info.getRegionName());
|
||||
}
|
||||
return;
|
||||
if (regionManager.isMarkedClosedNoReopen(serverName, info.getRegionName())) {
|
||||
// Skip if region is on kill list
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("not assigning region (on kill list): " +
|
||||
info.getRegionName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
storedInfo = master.serversToServerInfo.get(serverName);
|
||||
deadServer = master.deadServers.contains(serverName);
|
||||
|
||||
storedInfo = master.serverManager.getServerInfo(serverName);
|
||||
deadServer = master.serverManager.isDead(serverName);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -366,13 +377,13 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
* then:
|
||||
*/
|
||||
if (!deadServer &&
|
||||
((storedInfo != null && storedInfo.getStartCode() != startCode) ||
|
||||
(storedInfo == null &&
|
||||
!master.unassignedRegions.containsKey(info) &&
|
||||
!master.pendingRegions.contains(info.getRegionName())
|
||||
)
|
||||
((storedInfo != null && storedInfo.getStartCode() != startCode) ||
|
||||
(storedInfo == null &&
|
||||
!regionManager.isUnassigned(info) &&
|
||||
!regionManager.isPending(info.getRegionName())
|
||||
)
|
||||
) {
|
||||
)
|
||||
) {
|
||||
|
||||
// The current assignment is invalid
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -380,25 +391,26 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
" is not valid: storedInfo: " + storedInfo + ", startCode: " +
|
||||
startCode + ", storedInfo.startCode: " +
|
||||
((storedInfo != null)? storedInfo.getStartCode(): -1) +
|
||||
", unassignedRegions: " + master.unassignedRegions.containsKey(info) +
|
||||
", unassignedRegions: " +
|
||||
regionManager.isUnassigned(info) +
|
||||
", pendingRegions: " +
|
||||
master.pendingRegions.contains(info.getRegionName()));
|
||||
regionManager.isPending(info.getRegionName()));
|
||||
}
|
||||
// Recover the region server's log if there is one.
|
||||
// This is only done from here if we are restarting and there is stale
|
||||
// data in the meta region. Once we are on-line, dead server log
|
||||
// recovery is handled by lease expiration and ProcessServerShutdown
|
||||
if (!master.initialMetaScanComplete && serverName.length() != 0) {
|
||||
if (!regionManager.isInitialMetaScanComplete() && serverName.length() != 0) {
|
||||
StringBuilder dirName = new StringBuilder("log_");
|
||||
dirName.append(serverName.replace(":", "_"));
|
||||
Path logDir = new Path(master.rootdir, dirName.toString());
|
||||
try {
|
||||
if (master.fs.exists(logDir)) {
|
||||
master.splitLogLock.lock();
|
||||
regionManager.splitLogLock.lock();
|
||||
try {
|
||||
HLog.splitLog(master.rootdir, logDir, master.fs, master.conf);
|
||||
} finally {
|
||||
master.splitLogLock.unlock();
|
||||
regionManager.splitLogLock.unlock();
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -410,7 +422,18 @@ abstract class BaseScanner extends Chore implements HConstants {
|
|||
}
|
||||
}
|
||||
// Now get the region assigned
|
||||
master.unassignedRegions.put(info, ZERO_L);
|
||||
regionManager.setUnassigned(info);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the thread to die at the end of its next run
|
||||
*/
|
||||
public void interruptIfAlive() {
|
||||
synchronized(scannerLock){
|
||||
if (isAlive()) {
|
||||
super.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -92,13 +92,14 @@ class ChangeTableState extends TableOperation {
|
|||
LOG.debug("updated columns in row: " + i.getRegionName());
|
||||
}
|
||||
|
||||
if (online) { // Bring offline regions on-line
|
||||
if (!this.master.unassignedRegions.containsKey(i)) {
|
||||
this.master.unassignedRegions.put(i, ZERO_L);
|
||||
if (online) {
|
||||
// Bring offline regions on-line
|
||||
if (!master.regionManager.isUnassigned(i)) {
|
||||
master.regionManager.setUnassigned(i);
|
||||
}
|
||||
|
||||
} else { // Prevent region from getting assigned.
|
||||
this.master.unassignedRegions.remove(i);
|
||||
} else {
|
||||
// Prevent region from getting assigned.
|
||||
master.regionManager.noLongerUnassigned(i);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,17 +120,16 @@ class ChangeTableState extends TableOperation {
|
|||
HashMap<Text, HRegionInfo> localKillList =
|
||||
new HashMap<Text, HRegionInfo>();
|
||||
|
||||
synchronized (this.master.killList) {
|
||||
HashMap<Text, HRegionInfo> killedRegions =
|
||||
this.master.killList.get(serverName);
|
||||
if (killedRegions != null) {
|
||||
localKillList.putAll(killedRegions);
|
||||
}
|
||||
Map<Text, HRegionInfo> killedRegions =
|
||||
master.regionManager.getMarkedClosedNoReopen(serverName);
|
||||
if (killedRegions != null) {
|
||||
localKillList.putAll(killedRegions);
|
||||
}
|
||||
|
||||
for (HRegionInfo i: e.getValue()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("adding region " + i.getRegionName() +
|
||||
" to local kill list");
|
||||
" to kill list");
|
||||
}
|
||||
localKillList.put(i.getRegionName(), i);
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ class ChangeTableState extends TableOperation {
|
|||
LOG.debug("inserted local kill list into kill list for server " +
|
||||
serverName);
|
||||
}
|
||||
this.master.killList.put(serverName, localKillList);
|
||||
master.regionManager.markClosedNoReopenBulk(serverName, localKillList);
|
||||
}
|
||||
}
|
||||
servedRegions.clear();
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -23,6 +23,8 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
|
||||
|
@ -36,12 +38,16 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
* action would prevent other work from getting done.
|
||||
*/
|
||||
class MetaScanner extends BaseScanner {
|
||||
/** Work for the meta scanner is queued up here */
|
||||
private volatile BlockingQueue<MetaRegion> metaRegionsToScan =
|
||||
new LinkedBlockingQueue<MetaRegion>();
|
||||
|
||||
private final List<MetaRegion> metaRegionsToRescan =
|
||||
new ArrayList<MetaRegion>();
|
||||
|
||||
/** Constructor */
|
||||
public MetaScanner(HMaster master) {
|
||||
super(master, false, master.metaRescanInterval, master.closed);
|
||||
public MetaScanner(HMaster master, RegionManager regionManager) {
|
||||
super(master, regionManager, false, master.metaRescanInterval, master.closed);
|
||||
}
|
||||
|
||||
private boolean scanOneMetaRegion(MetaRegion region) {
|
||||
|
@ -49,8 +55,8 @@ class MetaScanner extends BaseScanner {
|
|||
// caused by the server going away. Wait until next rescan interval when
|
||||
// things should be back to normal
|
||||
boolean scanSuccessful = false;
|
||||
while (!master.closed.get() && !master.rootScanned &&
|
||||
master.rootRegionLocation.get() == null) {
|
||||
while (!master.closed.get() && !regionManager.isInitialRootScanComplete() &&
|
||||
regionManager.getRootRegionLocation() == null) {
|
||||
master.sleeper.sleep();
|
||||
}
|
||||
if (master.closed.get()) {
|
||||
|
@ -59,9 +65,9 @@ class MetaScanner extends BaseScanner {
|
|||
|
||||
try {
|
||||
// Don't interrupt us while we're working
|
||||
synchronized (master.metaScannerLock) {
|
||||
synchronized (scannerLock) {
|
||||
scanRegion(region);
|
||||
master.onlineMetaRegions.put(region.getStartKey(), region);
|
||||
regionManager.putMetaRegionOnline(region);
|
||||
}
|
||||
scanSuccessful = true;
|
||||
} catch (IOException e) {
|
||||
|
@ -71,7 +77,7 @@ class MetaScanner extends BaseScanner {
|
|||
// so, either it won't be in the onlineMetaRegions list or its host
|
||||
// address has changed and the containsValue will fail. If not
|
||||
// found, best thing to do here is probably return.
|
||||
if (!master.onlineMetaRegions.containsValue(region.getStartKey())) {
|
||||
if (!regionManager.isMetaRegionOnline(region.getStartKey())) {
|
||||
LOG.debug("Scanned region is no longer in map of online " +
|
||||
"regions or its value has changed");
|
||||
return scanSuccessful;
|
||||
|
@ -91,7 +97,7 @@ class MetaScanner extends BaseScanner {
|
|||
MetaRegion region = null;
|
||||
while (!master.closed.get() && region == null && !metaRegionsScanned()) {
|
||||
try {
|
||||
region = master.metaRegionsToScan.poll(master.threadWakeFrequency,
|
||||
region = metaRegionsToScan.poll(master.threadWakeFrequency,
|
||||
TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
|
@ -105,16 +111,13 @@ class MetaScanner extends BaseScanner {
|
|||
}
|
||||
}
|
||||
}
|
||||
master.initialMetaScanComplete = true;
|
||||
initialScanComplete = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void maintenanceScan() {
|
||||
ArrayList<MetaRegion> regions = new ArrayList<MetaRegion>();
|
||||
synchronized (master.onlineMetaRegions) {
|
||||
regions.addAll(master.onlineMetaRegions.values());
|
||||
}
|
||||
List<MetaRegion> regions = regionManager.getListOfOnlineMetaRegions();
|
||||
for (MetaRegion r: regions) {
|
||||
scanOneMetaRegion(r);
|
||||
}
|
||||
|
@ -126,8 +129,8 @@ class MetaScanner extends BaseScanner {
|
|||
* regions. This wakes up any threads that were waiting for this to happen.
|
||||
*/
|
||||
private synchronized boolean metaRegionsScanned() {
|
||||
if (!master.rootScanned ||
|
||||
master.numberOfMetaRegions.get() != master.onlineMetaRegions.size()) {
|
||||
if (!regionManager.isInitialRootScanComplete() ||
|
||||
regionManager.numMetaRegions() != regionManager.numOnlineMetaRegions()) {
|
||||
return false;
|
||||
}
|
||||
LOG.info("all meta regions scanned");
|
||||
|
@ -141,8 +144,8 @@ class MetaScanner extends BaseScanner {
|
|||
*/
|
||||
synchronized boolean waitForMetaRegionsOrClose() {
|
||||
while (!master.closed.get()) {
|
||||
if (master.rootScanned &&
|
||||
master.numberOfMetaRegions.get() == master.onlineMetaRegions.size()) {
|
||||
if (regionManager.isInitialRootScanComplete() &&
|
||||
regionManager.numMetaRegions() == regionManager.numOnlineMetaRegions()) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -154,4 +157,11 @@ class MetaScanner extends BaseScanner {
|
|||
}
|
||||
return master.closed.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add another meta region to scan to the queue.
|
||||
*/
|
||||
void addMetaRegionToScan(MetaRegion m) throws InterruptedException {
|
||||
metaRegionsToScan.add(m);
|
||||
}
|
||||
}
|
|
@ -90,9 +90,7 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
|
|||
|
||||
if (reassignRegion) {
|
||||
LOG.info("reassign region: " + regionInfo.getRegionName());
|
||||
|
||||
master.unassignedRegions.put(regionInfo, ZERO_L);
|
||||
|
||||
master.regionManager.setUnassigned(regionInfo);
|
||||
} else if (deleteRegion) {
|
||||
try {
|
||||
HRegion.deleteRegion(master.fs, master.rootdir, regionInfo);
|
||||
|
|
|
@ -90,23 +90,23 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
// It's a meta region.
|
||||
MetaRegion m = new MetaRegion(this.serverAddress,
|
||||
this.regionInfo.getRegionName(), this.regionInfo.getStartKey());
|
||||
if (!master.initialMetaScanComplete) {
|
||||
if (!master.regionManager.isInitialMetaScanComplete()) {
|
||||
// Put it on the queue to be scanned for the first time.
|
||||
try {
|
||||
LOG.debug("Adding " + m.toString() + " to regions to scan");
|
||||
master.metaRegionsToScan.put(m);
|
||||
master.regionManager.addMetaRegionToScan(m);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into metaRegionsToScan was interrupted.", e);
|
||||
"Putting into metaRegionsToScan was interrupted.", e);
|
||||
}
|
||||
} else {
|
||||
// Add it to the online meta regions
|
||||
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
|
||||
master.onlineMetaRegions.put(this.regionInfo.getStartKey(), m);
|
||||
master.regionManager.putMetaRegionOnline(m);
|
||||
}
|
||||
}
|
||||
// If updated successfully, remove from pending list.
|
||||
master.pendingRegions.remove(regionInfo.getRegionName());
|
||||
master.regionManager.noLongerPending(regionInfo.getRegionName());
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
|
|
|
@ -56,7 +56,7 @@ abstract class ProcessRegionStatusChange extends RegionServerOperation {
|
|||
available = false;
|
||||
}
|
||||
} else {
|
||||
if (!master.rootScanned || !metaTableAvailable()) {
|
||||
if (!master.regionManager.isInitialRootScanComplete() || !metaTableAvailable()) {
|
||||
// The root region has not been scanned or the meta table is not
|
||||
// available so we can't proceed.
|
||||
// Put the operation on the delayedToDoQueue
|
||||
|
@ -68,26 +68,18 @@ abstract class ProcessRegionStatusChange extends RegionServerOperation {
|
|||
}
|
||||
|
||||
protected HRegionInterface getMetaServer() throws IOException {
|
||||
if (this.isMetaTable) {
|
||||
this.metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
|
||||
if (isMetaTable) {
|
||||
metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
|
||||
} else {
|
||||
if (this.metaRegion == null) {
|
||||
synchronized (master.onlineMetaRegions) {
|
||||
metaRegion = master.onlineMetaRegions.size() == 1 ?
|
||||
master.onlineMetaRegions.get(master.onlineMetaRegions.firstKey()) :
|
||||
master.onlineMetaRegions.containsKey(regionInfo.getRegionName()) ?
|
||||
master.onlineMetaRegions.get(regionInfo.getRegionName()) :
|
||||
master.onlineMetaRegions.get(master.onlineMetaRegions.headMap(
|
||||
regionInfo.getRegionName()).lastKey());
|
||||
}
|
||||
this.metaRegionName = metaRegion.getRegionName();
|
||||
if (metaRegion == null) {
|
||||
metaRegion = master.regionManager.getFirstMetaRegionForRegion(regionInfo);
|
||||
metaRegionName = metaRegion.getRegionName();
|
||||
}
|
||||
}
|
||||
|
||||
HServerAddress server = null;
|
||||
if (isMetaTable) {
|
||||
server = master.rootRegionLocation.get();
|
||||
|
||||
server = master.getRootRegionLocation();
|
||||
} else {
|
||||
server = metaRegion.getServer();
|
||||
}
|
||||
|
|
|
@ -151,42 +151,30 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
LOG.debug("removing meta region " + info.getRegionName() +
|
||||
" from online meta regions");
|
||||
}
|
||||
master.onlineMetaRegions.remove(info.getStartKey());
|
||||
master.regionManager.offlineMetaRegion(info.getStartKey());
|
||||
}
|
||||
|
||||
ToDoEntry todo = new ToDoEntry(row, info);
|
||||
toDoList.add(todo);
|
||||
|
||||
if (master.killList.containsKey(deadServerName)) {
|
||||
HashMap<Text, HRegionInfo> regionsToKill =
|
||||
new HashMap<Text, HRegionInfo>();
|
||||
synchronized (master.killList) {
|
||||
regionsToKill.putAll(master.killList.get(deadServerName));
|
||||
if (master.regionManager.isMarkedClosedNoReopen(deadServerName, info.getRegionName())) {
|
||||
master.regionManager.noLongerMarkedClosedNoReopen(deadServerName, info.getRegionName());
|
||||
master.regionManager.noLongerUnassigned(info);
|
||||
if (master.regionManager.isMarkedForDeletion(info.getRegionName())) {
|
||||
// Delete this region
|
||||
master.regionManager.regionDeleted(info.getRegionName());
|
||||
todo.deleteRegion = true;
|
||||
} else {
|
||||
// Mark region offline
|
||||
todo.regionOffline = true;
|
||||
}
|
||||
|
||||
if (regionsToKill.containsKey(info.getRegionName())) {
|
||||
regionsToKill.remove(info.getRegionName());
|
||||
master.killList.put(deadServerName, regionsToKill);
|
||||
master.unassignedRegions.remove(info);
|
||||
synchronized (master.regionsToDelete) {
|
||||
if (master.regionsToDelete.contains(info.getRegionName())) {
|
||||
// Delete this region
|
||||
master.regionsToDelete.remove(info.getRegionName());
|
||||
todo.deleteRegion = true;
|
||||
} else {
|
||||
// Mark region offline
|
||||
todo.regionOffline = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Get region reassigned
|
||||
regions.add(info);
|
||||
|
||||
// If it was pending, remove.
|
||||
// Otherwise will obstruct its getting reassigned.
|
||||
master.pendingRegions.remove(info.getRegionName());
|
||||
master.regionManager.noLongerPending(info.getRegionName());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -211,27 +199,29 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
|
||||
// Get regions reassigned
|
||||
for (HRegionInfo info: regions) {
|
||||
master.unassignedRegions.put(info, ZERO_L);
|
||||
master.regionManager.setUnassigned(info);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean process() throws IOException {
|
||||
LOG.info("process shutdown of server " + deadServer + ": logSplit: " +
|
||||
this.logSplit + ", rootRescanned: " + this.rootRescanned +
|
||||
", numberOfMetaRegions: " + master.numberOfMetaRegions.get() +
|
||||
", onlineMetaRegions.size(): " + master.onlineMetaRegions.size());
|
||||
this.logSplit + ", rootRescanned: " + rootRescanned +
|
||||
", numberOfMetaRegions: " +
|
||||
master.regionManager.numMetaRegions() +
|
||||
", onlineMetaRegions.size(): " +
|
||||
master.regionManager.numOnlineMetaRegions());
|
||||
|
||||
if (!logSplit) {
|
||||
// Process the old log file
|
||||
if (master.fs.exists(oldLogDir)) {
|
||||
if (!master.splitLogLock.tryLock()) {
|
||||
if (!master.regionManager.splitLogLock.tryLock()) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
HLog.splitLog(master.rootdir, oldLogDir, master.fs, master.conf);
|
||||
} finally {
|
||||
master.splitLogLock.unlock();
|
||||
master.regionManager.splitLogLock.unlock();
|
||||
}
|
||||
}
|
||||
logSplit = true;
|
||||
|
@ -253,23 +243,23 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
if (master.closed.get()) {
|
||||
return true;
|
||||
}
|
||||
server = master.connection.getHRegionConnection(master.rootRegionLocation.get());
|
||||
server = master.connection.getHRegionConnection(
|
||||
master.getRootRegionLocation());
|
||||
scannerId = -1L;
|
||||
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("process server shutdown scanning root region on " +
|
||||
master.rootRegionLocation.get().getBindAddress());
|
||||
master.getRootRegionLocation().getBindAddress());
|
||||
}
|
||||
scannerId =
|
||||
server.openScanner(HRegionInfo.rootRegionInfo.getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW,
|
||||
System.currentTimeMillis(), null);
|
||||
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW,
|
||||
System.currentTimeMillis(), null);
|
||||
|
||||
scanMetaRegion(server, scannerId,
|
||||
HRegionInfo.rootRegionInfo.getRegionName());
|
||||
HRegionInfo.rootRegionInfo.getRegionName());
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
throw RemoteExceptionHandler.checkIOException(e);
|
||||
|
@ -278,8 +268,8 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("process server shutdown scanning root region on " +
|
||||
master.rootRegionLocation.get().getBindAddress() + " finished " +
|
||||
Thread.currentThread().getName());
|
||||
master.getRootRegionLocation().getBindAddress() +
|
||||
" finished " + Thread.currentThread().getName());
|
||||
}
|
||||
rootRescanned = true;
|
||||
}
|
||||
|
@ -296,34 +286,31 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
if (master.closed.get()) {
|
||||
return true;
|
||||
}
|
||||
List<MetaRegion> regions = new ArrayList<MetaRegion>();
|
||||
synchronized (master.onlineMetaRegions) {
|
||||
regions.addAll(master.onlineMetaRegions.values());
|
||||
}
|
||||
List<MetaRegion> regions = master.regionManager.getListOfOnlineMetaRegions();
|
||||
for (MetaRegion r: regions) {
|
||||
HRegionInterface server = null;
|
||||
long scannerId = -1L;
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("process server shutdown scanning " +
|
||||
r.getRegionName() + " on " + r.getServer() + " " +
|
||||
Thread.currentThread().getName());
|
||||
r.getRegionName() + " on " + r.getServer() + " " +
|
||||
Thread.currentThread().getName());
|
||||
}
|
||||
server = master.connection.getHRegionConnection(r.getServer());
|
||||
|
||||
scannerId =
|
||||
server.openScanner(r.getRegionName(), COLUMN_FAMILY_ARRAY,
|
||||
EMPTY_START_ROW, System.currentTimeMillis(), null);
|
||||
EMPTY_START_ROW, System.currentTimeMillis(), null);
|
||||
|
||||
scanMetaRegion(server, scannerId, r.getRegionName());
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("process server shutdown finished scanning " +
|
||||
r.getRegionName() + " on " + r.getServer() + " " +
|
||||
Thread.currentThread().getName());
|
||||
r.getRegionName() + " on " + r.getServer() + " " +
|
||||
Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
master.deadServers.remove(deadServerName);
|
||||
master.serverManager.removeDeadServer(deadServerName);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -0,0 +1,630 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HServerLoad;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegion;
|
||||
import org.apache.hadoop.hbase.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* Class to manage assigning regions to servers, state of root and meta, etc.
|
||||
*/
|
||||
class RegionManager implements HConstants {
|
||||
protected static final Log LOG = LogFactory.getLog(RegionManager.class.getName());
|
||||
|
||||
private volatile AtomicReference<HServerAddress> rootRegionLocation =
|
||||
new AtomicReference<HServerAddress>(null);
|
||||
|
||||
final Lock splitLogLock = new ReentrantLock();
|
||||
|
||||
private final RootScanner rootScannerThread;
|
||||
final MetaScanner metaScannerThread;
|
||||
|
||||
/** Set by root scanner to indicate the number of meta regions */
|
||||
private final AtomicInteger numberOfMetaRegions = new AtomicInteger();
|
||||
|
||||
/** These are the online meta regions */
|
||||
private final SortedMap<Text, MetaRegion> onlineMetaRegions =
|
||||
Collections.synchronizedSortedMap(new TreeMap<Text, MetaRegion>());
|
||||
|
||||
/**
|
||||
* The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that
|
||||
* indicates the last time we *tried* to assign the region to a RegionServer.
|
||||
* If the timestamp is out of date, then we can try to reassign it.
|
||||
*
|
||||
* We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
|
||||
* set of all known valid regions.
|
||||
*
|
||||
* <p>Items are removed from this list when a region server reports in that
|
||||
* the region has been deployed.
|
||||
*/
|
||||
private final SortedMap<HRegionInfo, Long> unassignedRegions =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HRegionInfo, Long>());
|
||||
|
||||
/**
|
||||
* Regions that have been assigned, and the server has reported that it has
|
||||
* started serving it, but that we have not yet recorded in the meta table.
|
||||
*/
|
||||
private final Set<Text> pendingRegions =
|
||||
Collections.synchronizedSet(new HashSet<Text>());
|
||||
|
||||
/**
|
||||
* The 'killList' is a list of regions that are going to be closed, but not
|
||||
* reopened.
|
||||
*/
|
||||
private final Map<String, Map<Text, HRegionInfo>> killList =
|
||||
new ConcurrentHashMap<String, Map<Text, HRegionInfo>>();
|
||||
|
||||
/** 'killedRegions' contains regions that are in the process of being closed */
|
||||
private final Set<Text> killedRegions =
|
||||
Collections.synchronizedSet(new HashSet<Text>());
|
||||
|
||||
/**
|
||||
* 'regionsToDelete' contains regions that need to be deleted, but cannot be
|
||||
* until the region server closes it
|
||||
*/
|
||||
private final Set<Text> regionsToDelete =
|
||||
Collections.synchronizedSet(new HashSet<Text>());
|
||||
|
||||
private HMaster master;
|
||||
|
||||
RegionManager(HMaster master) {
|
||||
this.master = master;
|
||||
|
||||
// The root region
|
||||
rootScannerThread = new RootScanner(master, this);
|
||||
|
||||
// Scans the meta table
|
||||
metaScannerThread = new MetaScanner(master, this);
|
||||
|
||||
unassignRootRegion();
|
||||
}
|
||||
|
||||
void start() {
|
||||
Threads.setDaemonThreadRunning(rootScannerThread,
|
||||
"RegionManager.rootScanner");
|
||||
Threads.setDaemonThreadRunning(metaScannerThread,
|
||||
"RegionManager.metaScanner");
|
||||
}
|
||||
|
||||
/*
|
||||
* Unassign the root region.
|
||||
* This method would be used in case where root region server had died
|
||||
* without reporting in. Currently, we just flounder and never recover. We
|
||||
* could 'notice' dead region server in root scanner -- if we failed access
|
||||
* multiple times -- but reassigning root is catastrophic.
|
||||
*
|
||||
*/
|
||||
void unassignRootRegion() {
|
||||
rootRegionLocation.set(null);
|
||||
if (!master.shutdownRequested) {
|
||||
unassignedRegions.put(HRegionInfo.rootRegionInfo, ZERO_L);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Assigns regions to region servers attempting to balance the load across
|
||||
* all region servers
|
||||
*
|
||||
* @param info
|
||||
* @param serverName
|
||||
* @param returnMsgs
|
||||
*/
|
||||
void assignRegions(HServerInfo info, String serverName,
|
||||
ArrayList<HMsg> returnMsgs) {
|
||||
|
||||
synchronized (unassignedRegions) {
|
||||
|
||||
// We need to hold a lock on assign attempts while we figure out what to
|
||||
// do so that multiple threads do not execute this method in parallel
|
||||
// resulting in assigning the same region to multiple servers.
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>();
|
||||
for (Map.Entry<HRegionInfo, Long> e: unassignedRegions.entrySet()) {
|
||||
HRegionInfo i = e.getKey();
|
||||
if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
|
||||
!i.isMetaRegion()) {
|
||||
// Can't assign user regions until all meta regions have been assigned
|
||||
// and are on-line
|
||||
continue;
|
||||
}
|
||||
long diff = now - e.getValue().longValue();
|
||||
if (diff > master.maxRegionOpenTime) {
|
||||
regionsToAssign.add(e.getKey());
|
||||
}
|
||||
}
|
||||
int nRegionsToAssign = regionsToAssign.size();
|
||||
if (nRegionsToAssign <= 0) {
|
||||
// No regions to assign. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
if (master.serverManager.numServers() == 1) {
|
||||
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
||||
// Finished. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// Multiple servers in play.
|
||||
// We need to allocate regions only to most lightly loaded servers.
|
||||
HServerLoad thisServersLoad = info.getLoad();
|
||||
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
|
||||
nRegionsToAssign -= nregions;
|
||||
if (nRegionsToAssign > 0) {
|
||||
// We still have more regions to assign. See how many we can assign
|
||||
// before this server becomes more heavily loaded than the next
|
||||
// most heavily loaded server.
|
||||
SortedMap<HServerLoad, Set<String>> heavyServers =
|
||||
new TreeMap<HServerLoad, Set<String>>();
|
||||
synchronized (master.serverManager.loadToServers) {
|
||||
heavyServers.putAll(
|
||||
master.serverManager.loadToServers.tailMap(thisServersLoad));
|
||||
}
|
||||
int nservers = 0;
|
||||
HServerLoad heavierLoad = null;
|
||||
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
|
||||
Set<String> servers = e.getValue();
|
||||
nservers += servers.size();
|
||||
if (e.getKey().compareTo(thisServersLoad) == 0) {
|
||||
// This is the load factor of the server we are considering
|
||||
nservers -= 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we get here, we are at the first load entry that is a
|
||||
// heavier load than the server we are considering
|
||||
heavierLoad = e.getKey();
|
||||
break;
|
||||
}
|
||||
|
||||
nregions = 0;
|
||||
if (heavierLoad != null) {
|
||||
// There is a more heavily loaded server
|
||||
for (HServerLoad load =
|
||||
new HServerLoad(thisServersLoad.getNumberOfRequests(),
|
||||
thisServersLoad.getNumberOfRegions());
|
||||
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
|
||||
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
|
||||
// continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (nregions < nRegionsToAssign) {
|
||||
// There are some more heavily loaded servers
|
||||
// but we can't assign all the regions to this server.
|
||||
if (nservers > 0) {
|
||||
// There are other servers that can share the load.
|
||||
// Split regions that need assignment across the servers.
|
||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||
/ (1.0 * nservers));
|
||||
} else {
|
||||
// No other servers with same load.
|
||||
// Split regions over all available servers
|
||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||
/ (1.0 * master.serverManager.numServers()));
|
||||
}
|
||||
} else {
|
||||
// Assign all regions to this server
|
||||
nregions = nRegionsToAssign;
|
||||
}
|
||||
|
||||
now = System.currentTimeMillis();
|
||||
for (HRegionInfo regionInfo: regionsToAssign) {
|
||||
LOG.info("assigning region " + regionInfo.getRegionName() +
|
||||
" to server " + serverName);
|
||||
unassignedRegions.put(regionInfo, Long.valueOf(now));
|
||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
||||
if (--nregions <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @param nRegionsToAssign
|
||||
* @param thisServersLoad
|
||||
* @return How many regions we can assign to more lightly loaded servers
|
||||
*/
|
||||
private int regionsPerServer(final int nRegionsToAssign,
|
||||
final HServerLoad thisServersLoad) {
|
||||
|
||||
SortedMap<HServerLoad, Set<String>> lightServers =
|
||||
new TreeMap<HServerLoad, Set<String>>();
|
||||
|
||||
synchronized (master.serverManager.loadToServers) {
|
||||
lightServers.putAll(master.serverManager.loadToServers.headMap(thisServersLoad));
|
||||
}
|
||||
|
||||
int nRegions = 0;
|
||||
for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
|
||||
HServerLoad lightLoad = new HServerLoad(e.getKey().getNumberOfRequests(),
|
||||
e.getKey().getNumberOfRegions());
|
||||
do {
|
||||
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
|
||||
nRegions += 1;
|
||||
} while (lightLoad.compareTo(thisServersLoad) <= 0
|
||||
&& nRegions < nRegionsToAssign);
|
||||
|
||||
nRegions *= e.getValue().size();
|
||||
if (nRegions >= nRegionsToAssign) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return nRegions;
|
||||
}
|
||||
|
||||
/*
|
||||
* Assign all to the only server. An unlikely case but still possible.
|
||||
* @param regionsToAssign
|
||||
* @param serverName
|
||||
* @param returnMsgs
|
||||
*/
|
||||
private void assignRegionsToOneServer(final Set<HRegionInfo> regionsToAssign,
|
||||
final String serverName, final ArrayList<HMsg> returnMsgs) {
|
||||
long now = System.currentTimeMillis();
|
||||
for (HRegionInfo regionInfo: regionsToAssign) {
|
||||
LOG.info("assigning region " + regionInfo.getRegionName() +
|
||||
" to the only server " + serverName);
|
||||
unassignedRegions.put(regionInfo, Long.valueOf(now));
|
||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only map of online regions.
|
||||
*/
|
||||
public Map<Text, MetaRegion> getOnlineMetaRegions() {
|
||||
return Collections.unmodifiableSortedMap(onlineMetaRegions);
|
||||
}
|
||||
|
||||
/*
|
||||
* Stop the root and meta scanners so that the region servers serving meta
|
||||
* regions can shut down.
|
||||
*/
|
||||
public void stopScanners() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("telling root scanner to stop");
|
||||
}
|
||||
rootScannerThread.interruptIfAlive();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("telling meta scanner to stop");
|
||||
}
|
||||
metaScannerThread.interruptIfAlive();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("meta and root scanners notified");
|
||||
}
|
||||
}
|
||||
|
||||
/** Stop the region assigner */
|
||||
public void stop() {
|
||||
try {
|
||||
if (rootScannerThread.isAlive()) {
|
||||
rootScannerThread.join(); // Wait for the root scanner to finish.
|
||||
}
|
||||
} catch (Exception iex) {
|
||||
LOG.warn("root scanner", iex);
|
||||
}
|
||||
try {
|
||||
if (metaScannerThread.isAlive()) {
|
||||
metaScannerThread.join(); // Wait for meta scanner to finish.
|
||||
}
|
||||
} catch(Exception iex) {
|
||||
LOG.warn("meta scanner", iex);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean waitForMetaRegionsOrClose() throws IOException {
|
||||
return metaScannerThread.waitForMetaRegionsOrClose();
|
||||
}
|
||||
|
||||
/**
|
||||
* Search our map of online meta regions to find the first meta region that
|
||||
* should contain a pointer to <i>newRegion</i>.
|
||||
*/
|
||||
public MetaRegion getFirstMetaRegionForRegion(HRegionInfo newRegion) {
|
||||
synchronized (onlineMetaRegions) {
|
||||
if (onlineMetaRegions.size() == 0) {
|
||||
return null;
|
||||
} else if (onlineMetaRegions.size() == 1) {
|
||||
return onlineMetaRegions.get(onlineMetaRegions.firstKey());
|
||||
} else {
|
||||
if (onlineMetaRegions.containsKey(newRegion.getRegionName())) {
|
||||
return onlineMetaRegions.get(newRegion.getRegionName());
|
||||
} else {
|
||||
return onlineMetaRegions.get(onlineMetaRegions.headMap(
|
||||
newRegion.getTableDesc().getName()).lastKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a set of all the meta regions that contain info about a given table.
|
||||
*/
|
||||
public Set<MetaRegion> getMetaRegionsForTable(Text tableName) {
|
||||
Text firstMetaRegion = null;
|
||||
Set<MetaRegion> metaRegions = new HashSet<MetaRegion>();
|
||||
|
||||
synchronized (onlineMetaRegions) {
|
||||
if (onlineMetaRegions.size() == 1) {
|
||||
firstMetaRegion = onlineMetaRegions.firstKey();
|
||||
} else if (onlineMetaRegions.containsKey(tableName)) {
|
||||
firstMetaRegion = tableName;
|
||||
} else {
|
||||
firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey();
|
||||
}
|
||||
metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values());
|
||||
}
|
||||
return metaRegions;
|
||||
}
|
||||
|
||||
public void createRegion(HRegionInfo newRegion, HRegionInterface server,
|
||||
Text metaRegionName)
|
||||
throws IOException {
|
||||
// 2. Create the HRegion
|
||||
HRegion region =
|
||||
HRegion.createHRegion(newRegion, master.rootdir, master.conf);
|
||||
|
||||
// 3. Insert into meta
|
||||
HRegionInfo info = region.getRegionInfo();
|
||||
Text regionName = region.getRegionName();
|
||||
BatchUpdate b = new BatchUpdate(regionName);
|
||||
b.put(COL_REGIONINFO, Writables.getBytes(info));
|
||||
server.batchUpdate(metaRegionName, b);
|
||||
|
||||
// 4. Close the new region to flush it to disk. Close its log file too.
|
||||
region.close();
|
||||
region.getLog().closeAndDelete();
|
||||
|
||||
// 5. Get it assigned to a server
|
||||
unassignedRegions.put(info, ZERO_L);
|
||||
}
|
||||
|
||||
/** Set a MetaRegion as online. */
|
||||
public void putMetaRegionOnline(MetaRegion metaRegion) {
|
||||
onlineMetaRegions.put(metaRegion.getStartKey(), metaRegion);
|
||||
}
|
||||
|
||||
/** Get a list of online MetaRegions */
|
||||
public List<MetaRegion> getListOfOnlineMetaRegions() {
|
||||
List<MetaRegion> regions = new ArrayList<MetaRegion>();
|
||||
synchronized(onlineMetaRegions) {
|
||||
regions.addAll(onlineMetaRegions.values());
|
||||
}
|
||||
return regions;
|
||||
}
|
||||
|
||||
/** count of online meta regions */
|
||||
public int numOnlineMetaRegions() {
|
||||
return onlineMetaRegions.size();
|
||||
}
|
||||
|
||||
/** Check if a meta region is online by its name */
|
||||
public boolean isMetaRegionOnline(Text startKey) {
|
||||
return onlineMetaRegions.containsKey(startKey);
|
||||
}
|
||||
|
||||
/** Set an online MetaRegion offline - remove it from the map. **/
|
||||
public void offlineMetaRegion(Text startKey) {
|
||||
onlineMetaRegions.remove(startKey);
|
||||
}
|
||||
|
||||
/** Check if a region is unassigned */
|
||||
public boolean isUnassigned(HRegionInfo info) {
|
||||
return unassignedRegions.containsKey(info);
|
||||
}
|
||||
|
||||
/** Check if a region is pending */
|
||||
public boolean isPending(Text regionName) {
|
||||
return pendingRegions.contains(regionName);
|
||||
}
|
||||
|
||||
/** Set a region to unassigned */
|
||||
public void setUnassigned(HRegionInfo info) {
|
||||
unassignedRegions.put(info, ZERO_L);
|
||||
}
|
||||
|
||||
/** Set a region to pending assignment */
|
||||
public void setPending(Text regionName) {
|
||||
pendingRegions.add(regionName);
|
||||
}
|
||||
|
||||
/** Unset region's pending status */
|
||||
public void noLongerPending(Text regionName) {
|
||||
pendingRegions.remove(regionName);
|
||||
}
|
||||
|
||||
/** Update the deadline for a region assignment to be completed */
|
||||
public void updateAssignmentDeadline(HRegionInfo info) {
|
||||
synchronized (unassignedRegions) {
|
||||
// Region server has acknowledged request to open region.
|
||||
// Extend region open time by max region open time.
|
||||
unassignedRegions.put(info,
|
||||
System.currentTimeMillis() + master.maxRegionOpenTime);
|
||||
}
|
||||
}
|
||||
|
||||
/** Unset a region's unassigned status */
|
||||
public void noLongerUnassigned(HRegionInfo info) {
|
||||
unassignedRegions.remove(info);
|
||||
}
|
||||
|
||||
/** Mark a region to be closed and not reopened */
|
||||
public void markClosedNoReopen(String serverName, HRegionInfo info) {
|
||||
synchronized (killList) {
|
||||
Map<Text, HRegionInfo> serverKillList = killList.get(serverName);
|
||||
if (serverKillList != null) {
|
||||
serverKillList.put(info.getRegionName(), info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Mark a bunch of regions as closed not reopen at once for a server */
|
||||
public void markClosedNoReopenBulk(String serverName,
|
||||
Map<Text, HRegionInfo> map) {
|
||||
killList.put(serverName, map);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a map of region names to region infos waiting to be offlined for a
|
||||
* given server
|
||||
*/
|
||||
public Map<Text, HRegionInfo> getMarkedClosedNoReopen(String serverName) {
|
||||
return killList.get(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a region is marked as closed not reopen.
|
||||
*/
|
||||
public boolean isMarkedClosedNoReopen(String serverName, Text regionName) {
|
||||
synchronized (killList) {
|
||||
Map<Text, HRegionInfo> regionsToKill =
|
||||
killList.get(serverName);
|
||||
return (regionsToKill != null && regionsToKill.containsKey(regionName));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a region as no longer waiting to be closed and not reopened.
|
||||
*/
|
||||
public void noLongerMarkedClosedNoReopen(String serverName, Text regionName) {
|
||||
synchronized (killList) {
|
||||
Map<Text, HRegionInfo> serverKillList = killList.get(serverName);
|
||||
if (serverKillList != null) {
|
||||
serverKillList.remove(regionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Check if a region is closing */
|
||||
public boolean isClosing(Text regionName) {
|
||||
return killedRegions.contains(regionName);
|
||||
}
|
||||
|
||||
/** Set a region as no longer closing (closed?) */
|
||||
public void noLongerClosing(Text regionName) {
|
||||
killedRegions.remove(regionName);
|
||||
}
|
||||
|
||||
/** mark a region as closing */
|
||||
public void setClosing(Text regionName) {
|
||||
killedRegions.add(regionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a meta region to the scan queue
|
||||
*/
|
||||
public void addMetaRegionToScan(MetaRegion m) throws InterruptedException {
|
||||
metaScannerThread.addMetaRegionToScan(m);
|
||||
}
|
||||
|
||||
/** Mark a region as to be deleted */
|
||||
public void markRegionForDeletion(Text regionName) {
|
||||
regionsToDelete.add(regionName);
|
||||
}
|
||||
|
||||
/** Note that a region to delete has been deleted */
|
||||
public void regionDeleted(Text regionName) {
|
||||
regionsToDelete.remove(regionName);
|
||||
}
|
||||
|
||||
/** Check if a region is marked for deletion */
|
||||
public boolean isMarkedForDeletion(Text regionName) {
|
||||
return regionsToDelete.contains(regionName);
|
||||
}
|
||||
|
||||
public boolean isInitialRootScanComplete() {
|
||||
return rootScannerThread.isInitialScanComplete();
|
||||
}
|
||||
|
||||
public boolean isInitialMetaScanComplete() {
|
||||
return metaScannerThread.isInitialScanComplete();
|
||||
}
|
||||
|
||||
public HServerAddress getRootRegionLocation() {
|
||||
return rootRegionLocation.get();
|
||||
}
|
||||
|
||||
public void waitForRootRegionLocation() {
|
||||
synchronized (rootRegionLocation) {
|
||||
while(!master.closed.get() && rootRegionLocation.get() == null) {
|
||||
// rootRegionLocation will be filled in when we get an 'open region'
|
||||
// regionServerReport message from the HRegionServer that has been
|
||||
// allocated the ROOT region below.
|
||||
try {
|
||||
rootRegionLocation.wait();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public int numMetaRegions() {
|
||||
return numberOfMetaRegions.get();
|
||||
}
|
||||
|
||||
public void incrementNumMetaRegions() {
|
||||
numberOfMetaRegions.incrementAndGet();
|
||||
}
|
||||
|
||||
public void setRootRegionLocation(HServerAddress address) {
|
||||
synchronized (rootRegionLocation) {
|
||||
rootRegionLocation.set(new HServerAddress(address));
|
||||
rootRegionLocation.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void setNumMetaRegions(int num) {
|
||||
numberOfMetaRegions.set(num);
|
||||
}
|
||||
}
|
|
@ -62,7 +62,7 @@ abstract class RegionServerOperation implements Delayed, HConstants {
|
|||
|
||||
protected boolean rootAvailable() {
|
||||
boolean available = true;
|
||||
if (master.rootRegionLocation.get() == null) {
|
||||
if (master.getRootRegionLocation() == null) {
|
||||
available = false;
|
||||
requeue();
|
||||
}
|
||||
|
@ -72,10 +72,13 @@ abstract class RegionServerOperation implements Delayed, HConstants {
|
|||
protected boolean metaTableAvailable() {
|
||||
boolean available = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("numberOfMetaRegions: " + master.numberOfMetaRegions.get() +
|
||||
", onlineMetaRegions.size(): " + master.onlineMetaRegions.size());
|
||||
LOG.debug("numberOfMetaRegions: " +
|
||||
master.regionManager.numMetaRegions() +
|
||||
", onlineMetaRegions.size(): " +
|
||||
master.regionManager.numOnlineMetaRegions());
|
||||
}
|
||||
if (master.numberOfMetaRegions.get() != master.onlineMetaRegions.size()) {
|
||||
if (master.regionManager.numMetaRegions() !=
|
||||
master.regionManager.numOnlineMetaRegions()) {
|
||||
// We can't proceed because not all of the meta regions are online.
|
||||
// We can't block either because that would prevent the meta region
|
||||
// online message from being processed. In order to prevent spinning
|
||||
|
|
|
@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
/** Scanner for the <code>ROOT</code> HRegion. */
|
||||
class RootScanner extends BaseScanner {
|
||||
/** Constructor */
|
||||
public RootScanner(HMaster master) {
|
||||
super(master, true, master.metaRescanInterval, master.closed);
|
||||
public RootScanner(HMaster master, RegionManager regionManager) {
|
||||
super(master, regionManager, true, master.metaRescanInterval, master.closed);
|
||||
}
|
||||
|
||||
private boolean scanRoot() {
|
||||
|
@ -36,27 +36,16 @@ class RootScanner extends BaseScanner {
|
|||
// caused by the server going away. Wait until next rescan interval when
|
||||
// things should be back to normal
|
||||
boolean scanSuccessful = false;
|
||||
synchronized (master.rootRegionLocation) {
|
||||
while(!master.closed.get() && master.rootRegionLocation.get() == null) {
|
||||
// rootRegionLocation will be filled in when we get an 'open region'
|
||||
// regionServerReport message from the HRegionServer that has been
|
||||
// allocated the ROOT region below.
|
||||
try {
|
||||
master.rootRegionLocation.wait();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
master.waitForRootRegionLocation();
|
||||
if (master.closed.get()) {
|
||||
return scanSuccessful;
|
||||
}
|
||||
|
||||
try {
|
||||
// Don't interrupt us while we're working
|
||||
synchronized(master.rootScannerLock) {
|
||||
scanRegion(new MetaRegion(master.rootRegionLocation.get(),
|
||||
HRegionInfo.rootRegionInfo.getRegionName(), null));
|
||||
synchronized(scannerLock) {
|
||||
scanRegion(new MetaRegion(master.getRootRegionLocation(),
|
||||
HRegionInfo.rootRegionInfo.getRegionName(), null));
|
||||
}
|
||||
scanSuccessful = true;
|
||||
} catch (IOException e) {
|
||||
|
@ -74,8 +63,8 @@ class RootScanner extends BaseScanner {
|
|||
|
||||
@Override
|
||||
protected boolean initialScan() {
|
||||
master.rootScanned = scanRoot();
|
||||
return master.rootScanned;
|
||||
initialScanComplete = scanRoot();
|
||||
return initialScanComplete;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,674 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HServerLoad;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Leases;
|
||||
import org.apache.hadoop.hbase.LeaseListener;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
* The ServerManager class manages info about region servers - HServerInfo,
|
||||
* load numbers, dying servers, etc.
|
||||
*/
|
||||
class ServerManager implements HConstants {
|
||||
static final Log LOG = LogFactory.getLog(ServerManager.class.getName());
|
||||
|
||||
/** The map of known server names to server info */
|
||||
private final Map<String, HServerInfo> serversToServerInfo =
|
||||
new ConcurrentHashMap<String, HServerInfo>();
|
||||
|
||||
/** Set of known dead servers */
|
||||
private final Set<String> deadServers =
|
||||
Collections.synchronizedSet(new HashSet<String>());
|
||||
|
||||
/** SortedMap server load -> Set of server names */
|
||||
final SortedMap<HServerLoad, Set<String>> loadToServers =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HServerLoad, Set<String>>());
|
||||
|
||||
/** Map of server names -> server load */
|
||||
private final Map<String, HServerLoad> serversToLoad =
|
||||
new ConcurrentHashMap<String, HServerLoad>();
|
||||
|
||||
private HMaster master;
|
||||
|
||||
private final Leases serverLeases;
|
||||
|
||||
public ServerManager(HMaster master) {
|
||||
this.master = master;
|
||||
serverLeases = new Leases(master.leaseTimeout,
|
||||
master.conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
|
||||
}
|
||||
|
||||
/** Let the server manager know a new regionserver has come online */
|
||||
public void regionServerStartup(HServerInfo serverInfo) {
|
||||
String s = serverInfo.getServerAddress().toString().trim();
|
||||
LOG.info("received start message from: " + s);
|
||||
|
||||
HServerLoad load = serversToLoad.remove(s);
|
||||
if (load != null) {
|
||||
// The startup message was from a known server.
|
||||
// Remove stale information about the server's load.
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers != null) {
|
||||
servers.remove(s);
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
}
|
||||
|
||||
HServerInfo storedInfo = serversToServerInfo.remove(s);
|
||||
if (storedInfo != null && !master.closed.get()) {
|
||||
// The startup message was from a known server with the same name.
|
||||
// Timeout the old one right away.
|
||||
HServerAddress root = master.getRootRegionLocation();
|
||||
if (root != null && root.equals(storedInfo.getServerAddress())) {
|
||||
master.regionManager.unassignRootRegion();
|
||||
}
|
||||
master.delayedToDoQueue.put(new ProcessServerShutdown(master, storedInfo));
|
||||
}
|
||||
|
||||
// record new server
|
||||
load = new HServerLoad();
|
||||
serverInfo.setLoad(load);
|
||||
serversToServerInfo.put(s, serverInfo);
|
||||
serversToLoad.put(s, load);
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers == null) {
|
||||
servers = new HashSet<String>();
|
||||
}
|
||||
servers.add(s);
|
||||
loadToServers.put(load, servers);
|
||||
|
||||
if (!master.closed.get()) {
|
||||
serverLeases.createLease(s, new ServerExpirer(s));
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
|
||||
throws IOException {
|
||||
String serverName = serverInfo.getServerAddress().toString().trim();
|
||||
|
||||
if (msgs.length > 0) {
|
||||
if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
|
||||
processRegionServerExit(serverName, msgs);
|
||||
return new HMsg[]{msgs[0]};
|
||||
} else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
|
||||
LOG.info("Region server " + serverName + " quiesced");
|
||||
master.quiescedMetaServers.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
if(master.quiescedMetaServers.get() >= serversToServerInfo.size()) {
|
||||
// If the only servers we know about are meta servers, then we can
|
||||
// proceed with shutdown
|
||||
LOG.info("All user tables quiesced. Proceeding with shutdown");
|
||||
master.startShutdown();
|
||||
}
|
||||
|
||||
if (master.shutdownRequested && !master.closed.get()) {
|
||||
// Tell the server to stop serving any user regions
|
||||
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
|
||||
}
|
||||
|
||||
if (master.closed.get()) {
|
||||
// Tell server to shut down if we are shutting down. This should
|
||||
// happen after check of MSG_REPORT_EXITING above, since region server
|
||||
// will send us one of these messages after it gets MSG_REGIONSERVER_STOP
|
||||
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
|
||||
}
|
||||
|
||||
HServerInfo storedInfo = serversToServerInfo.get(serverName);
|
||||
if (storedInfo == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("received server report from unknown server: " + serverName);
|
||||
}
|
||||
|
||||
// The HBaseMaster may have been restarted.
|
||||
// Tell the RegionServer to start over and call regionServerStartup()
|
||||
return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)};
|
||||
} else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
|
||||
// This state is reachable if:
|
||||
//
|
||||
// 1) RegionServer A started
|
||||
// 2) RegionServer B started on the same machine, then
|
||||
// clobbered A in regionServerStartup.
|
||||
// 3) RegionServer A returns, expecting to work as usual.
|
||||
//
|
||||
// The answer is to ask A to shut down for good.
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("region server race condition detected: " + serverName);
|
||||
}
|
||||
|
||||
synchronized (serversToServerInfo) {
|
||||
cancelLease(serverName);
|
||||
serversToServerInfo.notifyAll();
|
||||
}
|
||||
|
||||
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
|
||||
} else {
|
||||
return processRegionServerAllsWell(serverName, serverInfo, msgs);
|
||||
}
|
||||
}
|
||||
|
||||
/** Region server is exiting */
|
||||
private void processRegionServerExit(String serverName, HMsg[] msgs) {
|
||||
synchronized (serversToServerInfo) {
|
||||
try {
|
||||
// HRegionServer is shutting down. Cancel the server's lease.
|
||||
// Note that canceling the server's lease takes care of updating
|
||||
// serversToServerInfo, etc.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Region server " + serverName +
|
||||
": MSG_REPORT_EXITING -- cancelling lease");
|
||||
}
|
||||
|
||||
if (cancelLease(serverName)) {
|
||||
// Only process the exit message if the server still has a lease.
|
||||
// Otherwise we could end up processing the server exit twice.
|
||||
LOG.info("Region server " + serverName +
|
||||
": MSG_REPORT_EXITING -- lease cancelled");
|
||||
// Get all the regions the server was serving reassigned
|
||||
// (if we are not shutting down).
|
||||
if (!master.closed.get()) {
|
||||
for (int i = 1; i < msgs.length; i++) {
|
||||
HRegionInfo info = msgs[i].getRegionInfo();
|
||||
if (info.isRootRegion()) {
|
||||
master.regionManager.unassignRootRegion();
|
||||
} else if (info.isMetaTable()) {
|
||||
master.regionManager.offlineMetaRegion(info.getStartKey());
|
||||
}
|
||||
|
||||
master.regionManager.setUnassigned(info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We don't need to return anything to the server because it isn't
|
||||
// going to do any more work.
|
||||
/* return new HMsg[0];*/
|
||||
} finally {
|
||||
serversToServerInfo.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** RegionServer is checking in, no exceptional circumstances */
|
||||
private HMsg[] processRegionServerAllsWell(String serverName,
|
||||
HServerInfo serverInfo, HMsg[] msgs)
|
||||
throws IOException {
|
||||
// All's well. Renew the server's lease.
|
||||
// This will always succeed; otherwise, the fetch of serversToServerInfo
|
||||
// would have failed above.
|
||||
serverLeases.renewLease(serverName);
|
||||
|
||||
// Refresh the info object and the load information
|
||||
serversToServerInfo.put(serverName, serverInfo);
|
||||
|
||||
HServerLoad load = serversToLoad.get(serverName);
|
||||
if (load != null && !load.equals(serverInfo.getLoad())) {
|
||||
// We have previous information about the load on this server
|
||||
// and the load on this server has changed
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
|
||||
// Note that servers should never be null because loadToServers
|
||||
// and serversToLoad are manipulated in pairs
|
||||
servers.remove(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
|
||||
// Set the current load information
|
||||
load = serverInfo.getLoad();
|
||||
serversToLoad.put(serverName, load);
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers == null) {
|
||||
servers = new HashSet<String>();
|
||||
}
|
||||
servers.add(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
|
||||
// Next, process messages for this server
|
||||
return processMsgs(serverName, serverInfo, msgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all the incoming messages from a server that's contacted us.
|
||||
*
|
||||
* Note that we never need to update the server's load information because
|
||||
* that has already been done in regionServerReport.
|
||||
*/
|
||||
private HMsg[] processMsgs(String serverName, HServerInfo serverInfo,
|
||||
HMsg incomingMsgs[])
|
||||
throws IOException {
|
||||
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
|
||||
Map<Text, HRegionInfo> regionsToKill =
|
||||
master.regionManager.getMarkedClosedNoReopen(serverName);
|
||||
|
||||
// Get reports on what the RegionServer did.
|
||||
for (int i = 0; i < incomingMsgs.length; i++) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Received " + incomingMsgs[i].toString() + " from " +
|
||||
serverName);
|
||||
}
|
||||
HRegionInfo region = incomingMsgs[i].getRegionInfo();
|
||||
|
||||
switch (incomingMsgs[i].getMsg()) {
|
||||
case HMsg.MSG_REPORT_PROCESS_OPEN:
|
||||
master.regionManager.updateAssignmentDeadline(region);
|
||||
break;
|
||||
|
||||
case HMsg.MSG_REPORT_OPEN:
|
||||
processRegionOpen(serverName, serverInfo,
|
||||
incomingMsgs[i].getRegionInfo(), returnMsgs);
|
||||
break;
|
||||
|
||||
case HMsg.MSG_REPORT_CLOSE:
|
||||
LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " +
|
||||
region.getRegionName());
|
||||
|
||||
if (region.isRootRegion()) {
|
||||
// Root region
|
||||
if (region.isOffline()) {
|
||||
// Can't proceed without root region. Shutdown.
|
||||
LOG.fatal("root region is marked offline");
|
||||
master.shutdown();
|
||||
}
|
||||
master.regionManager.unassignRootRegion();
|
||||
|
||||
} else {
|
||||
boolean reassignRegion = !region.isOffline();
|
||||
boolean deleteRegion = false;
|
||||
|
||||
if (master.regionManager.isClosing(region.getRegionName())) {
|
||||
master.regionManager.noLongerClosing(region.getRegionName());
|
||||
reassignRegion = false;
|
||||
}
|
||||
|
||||
if (master.regionManager.isMarkedForDeletion(region.getRegionName())) {
|
||||
master.regionManager.regionDeleted(region.getRegionName());
|
||||
reassignRegion = false;
|
||||
deleteRegion = true;
|
||||
}
|
||||
|
||||
if (region.isMetaTable()) {
|
||||
// Region is part of the meta table. Remove it from onlineMetaRegions
|
||||
master.regionManager.offlineMetaRegion(region.getStartKey());
|
||||
}
|
||||
|
||||
// NOTE: we cannot put the region into unassignedRegions as that
|
||||
// could create a race with the pending close if it gets
|
||||
// reassigned before the close is processed.
|
||||
|
||||
master.regionManager.noLongerUnassigned(region);
|
||||
|
||||
try {
|
||||
master.toDoQueue.put(new ProcessRegionClose(master, region,
|
||||
reassignRegion, deleteRegion));
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into toDoQueue was interrupted.", e);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case HMsg.MSG_REPORT_SPLIT:
|
||||
processSplitRegion(serverName, serverInfo, region, incomingMsgs[++i],
|
||||
incomingMsgs[++i], returnMsgs);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IOException(
|
||||
"Impossible state during msg processing. Instruction: " +
|
||||
incomingMsgs[i].getMsg());
|
||||
}
|
||||
}
|
||||
|
||||
// Process the kill list
|
||||
|
||||
if (regionsToKill != null) {
|
||||
for (HRegionInfo i: regionsToKill.values()) {
|
||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
|
||||
master.regionManager.setClosing(i.getRegionName());
|
||||
}
|
||||
}
|
||||
|
||||
// Figure out what the RegionServer ought to do, and write back.
|
||||
master.regionManager.assignRegions(serverInfo, serverName, returnMsgs);
|
||||
return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
|
||||
}
|
||||
|
||||
/** A region has split. **/
|
||||
private void processSplitRegion(String serverName, HServerInfo serverInfo,
|
||||
HRegionInfo region, HMsg splitA, HMsg splitB, ArrayList<HMsg> returnMsgs) {
|
||||
|
||||
HRegionInfo newRegionA = splitA.getRegionInfo();
|
||||
master.regionManager.setUnassigned(newRegionA);
|
||||
|
||||
HRegionInfo newRegionB = splitB.getRegionInfo();
|
||||
master.regionManager.setUnassigned(newRegionB);
|
||||
|
||||
LOG.info("region " + region.getRegionName() + " split. New regions are: " +
|
||||
newRegionA.getRegionName() + ", " + newRegionB.getRegionName());
|
||||
|
||||
if (region.isMetaTable()) {
|
||||
// A meta region has split.
|
||||
master.regionManager.offlineMetaRegion(region.getStartKey());
|
||||
master.regionManager.incrementNumMetaRegions();
|
||||
}
|
||||
}
|
||||
|
||||
/** Region server is reporting that a region is now opened */
|
||||
private void processRegionOpen(String serverName, HServerInfo serverInfo,
|
||||
HRegionInfo region, ArrayList<HMsg> returnMsgs)
|
||||
throws IOException {
|
||||
boolean duplicateAssignment = false;
|
||||
|
||||
if (!master.regionManager.isUnassigned(region)) {
|
||||
if (region.isRootRegion()) {
|
||||
// Root region
|
||||
HServerAddress rootServer = master.getRootRegionLocation();
|
||||
if (rootServer != null) {
|
||||
if (rootServer.toString().compareTo(serverName) == 0) {
|
||||
// A duplicate open report from the correct server
|
||||
return;
|
||||
}
|
||||
// We received an open report on the root region, but it is
|
||||
// assigned to a different server
|
||||
duplicateAssignment = true;
|
||||
}
|
||||
} else {
|
||||
// Not root region. If it is not a pending region, then we are
|
||||
// going to treat it as a duplicate assignment, although we can't
|
||||
// tell for certain that's the case.
|
||||
if (master.regionManager.isPending(region.getRegionName())) {
|
||||
// A duplicate report from the correct server
|
||||
return;
|
||||
}
|
||||
duplicateAssignment = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (duplicateAssignment) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("region server " + serverInfo.getServerAddress().toString()
|
||||
+ " should not have opened region " + region.getRegionName());
|
||||
}
|
||||
|
||||
// This Region should not have been opened.
|
||||
// Ask the server to shut it down, but don't report it as closed.
|
||||
// Otherwise the HMaster will think the Region was closed on purpose,
|
||||
// and then try to reopen it elsewhere; that's not what we want.
|
||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region));
|
||||
} else {
|
||||
LOG.info(serverInfo.getServerAddress().toString() + " serving " +
|
||||
region.getRegionName());
|
||||
|
||||
// it was assigned, and it's not a duplicate assignment, so take it out
|
||||
// of the unassigned list.
|
||||
master.regionManager.noLongerUnassigned(region);
|
||||
|
||||
if (region.isRootRegion()) {
|
||||
// Store the Root Region location (in memory)
|
||||
master.regionManager.setRootRegionLocation(serverInfo.getServerAddress());
|
||||
} else {
|
||||
// Note that the table has been assigned and is waiting for the
|
||||
// meta table to be updated.
|
||||
master.regionManager.setPending(region.getRegionName());
|
||||
|
||||
// Queue up an update to note the region location.
|
||||
try {
|
||||
master.toDoQueue.put(
|
||||
new ProcessRegionOpen(master, serverInfo, region));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into toDoQueue was interrupted.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Region server reporting that it has closed a region */
|
||||
private void processRegionClose(String serverName, HServerInfo info,
|
||||
HRegionInfo region) {
|
||||
LOG.info(info.getServerAddress().toString() + " no longer serving " +
|
||||
region.getRegionName());
|
||||
|
||||
if (region.isRootRegion()) {
|
||||
if (region.isOffline()) {
|
||||
// Can't proceed without root region. Shutdown.
|
||||
LOG.fatal("root region is marked offline");
|
||||
master.shutdown();
|
||||
}
|
||||
master.regionManager.unassignRootRegion();
|
||||
} else {
|
||||
boolean reassignRegion = !region.isOffline();
|
||||
boolean deleteRegion = false;
|
||||
|
||||
if (master.regionManager.isClosing(region.getRegionName())) {
|
||||
master.regionManager.noLongerClosing(region.getRegionName());
|
||||
reassignRegion = false;
|
||||
}
|
||||
|
||||
if (master.regionManager.isMarkedForDeletion(region.getRegionName())) {
|
||||
master.regionManager.regionDeleted(region.getRegionName());
|
||||
reassignRegion = false;
|
||||
deleteRegion = true;
|
||||
}
|
||||
|
||||
if (region.isMetaTable()) {
|
||||
// Region is part of the meta table. Remove it from onlineMetaRegions
|
||||
master.regionManager.offlineMetaRegion(region.getStartKey());
|
||||
}
|
||||
|
||||
// NOTE: we cannot put the region into unassignedRegions as that
|
||||
// could create a race with the pending close if it gets
|
||||
// reassigned before the close is processed.
|
||||
master.regionManager.noLongerUnassigned(region);
|
||||
|
||||
try {
|
||||
master.toDoQueue.put(new ProcessRegionClose(master, region, reassignRegion,
|
||||
deleteRegion));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into toDoQueue was interrupted.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel a server's lease and update its load information */
|
||||
private boolean cancelLease(final String serverName) {
|
||||
boolean leaseCancelled = false;
|
||||
HServerInfo info = serversToServerInfo.remove(serverName);
|
||||
if (info != null) {
|
||||
// Only cancel lease and update load information once.
|
||||
// This method can be called a couple of times during shutdown.
|
||||
if (master.getRootRegionLocation() != null &&
|
||||
info.getServerAddress().equals(master.getRootRegionLocation())) {
|
||||
master.regionManager.unassignRootRegion();
|
||||
}
|
||||
LOG.info("Cancelling lease for " + serverName);
|
||||
serverLeases.cancelLease(serverName);
|
||||
leaseCancelled = true;
|
||||
|
||||
// update load information
|
||||
HServerLoad load = serversToLoad.remove(serverName);
|
||||
if (load != null) {
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers != null) {
|
||||
servers.remove(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
}
|
||||
}
|
||||
return leaseCancelled;
|
||||
}
|
||||
|
||||
|
||||
/** compute the average load across all region servers */
|
||||
public int averageLoad() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public int numServers() {
|
||||
return serversToServerInfo.size();
|
||||
}
|
||||
|
||||
/** get HServerInfo from a server address */
|
||||
public HServerInfo getServerInfo(String address) {
|
||||
return serversToServerInfo.get(address);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only map of servers to serverinfo.
|
||||
*/
|
||||
public Map<String, HServerInfo> getServersToServerInfo() {
|
||||
return Collections.unmodifiableMap(serversToServerInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only map of servers to load.
|
||||
*/
|
||||
public Map<String, HServerLoad> getServersToLoad() {
|
||||
return Collections.unmodifiableMap(serversToLoad);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only map of load to servers.
|
||||
*/
|
||||
public Map<HServerLoad, Set<String>> getLoadToServers() {
|
||||
return Collections.unmodifiableMap(loadToServers);
|
||||
}
|
||||
|
||||
public void notifyServers() {
|
||||
synchronized (serversToServerInfo) {
|
||||
serversToServerInfo.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait on regionservers to report in
|
||||
* with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice
|
||||
* the master is going down. Waits until all region servers come back with
|
||||
* a MSG_REGIONSERVER_STOP which will cancel their lease or until leases held
|
||||
* by remote region servers have expired.
|
||||
*/
|
||||
void letRegionServersShutdown() {
|
||||
if (!master.fsOk) {
|
||||
// Forget waiting for the region servers if the file system has gone
|
||||
// away. Just exit as quickly as possible.
|
||||
return;
|
||||
}
|
||||
synchronized (serversToServerInfo) {
|
||||
while (serversToServerInfo.size() > 0) {
|
||||
LOG.info("Waiting on following regionserver(s) to go down (or " +
|
||||
"region server lease expiration, whichever happens first): " +
|
||||
serversToServerInfo.values());
|
||||
try {
|
||||
serversToServerInfo.wait(master.threadWakeFrequency);
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Instantiated to monitor the health of a region server */
|
||||
private class ServerExpirer implements LeaseListener {
|
||||
@SuppressWarnings("hiding")
|
||||
private String server;
|
||||
|
||||
ServerExpirer(String server) {
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void leaseExpired() {
|
||||
LOG.info(server + " lease expired");
|
||||
// Remove the server from the known servers list and update load info
|
||||
HServerInfo info = serversToServerInfo.remove(server);
|
||||
if (info != null) {
|
||||
HServerAddress root = master.getRootRegionLocation();
|
||||
if (root != null && root.equals(info.getServerAddress())) {
|
||||
master.regionManager.unassignRootRegion();
|
||||
}
|
||||
String serverName = info.getServerAddress().toString();
|
||||
HServerLoad load = serversToLoad.remove(serverName);
|
||||
if (load != null) {
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (servers != null) {
|
||||
servers.remove(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
}
|
||||
deadServers.add(server);
|
||||
}
|
||||
synchronized (serversToServerInfo) {
|
||||
serversToServerInfo.notifyAll();
|
||||
}
|
||||
|
||||
// NOTE: If the server was serving the root region, we cannot reassign it
|
||||
// here because the new server will start serving the root region before
|
||||
// the ProcessServerShutdown operation has a chance to split the log file.
|
||||
if (info != null) {
|
||||
master.delayedToDoQueue.put(new ProcessServerShutdown(master, info));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Start up the server manager */
|
||||
public void start() {
|
||||
// Leases are not the same as Chore threads. Set name differently.
|
||||
this.serverLeases.setName("ServerManager.leaseChecker");
|
||||
this.serverLeases.start();
|
||||
}
|
||||
|
||||
/** Shut down the server manager */
|
||||
public void stop() {
|
||||
// stop monitor lease monitor
|
||||
serverLeases.close();
|
||||
}
|
||||
|
||||
public void removeDeadServer(String serverName) {
|
||||
deadServers.remove(serverName);
|
||||
}
|
||||
|
||||
public boolean isDead(String serverName) {
|
||||
return deadServers.contains(serverName);
|
||||
}
|
||||
}
|
|
@ -47,7 +47,7 @@ class TableDelete extends ChangeTableState {
|
|||
// For regions that are being served, mark them for deletion
|
||||
for (HashSet<HRegionInfo> s: servedRegions.values()) {
|
||||
for (HRegionInfo i: s) {
|
||||
this.master.regionsToDelete.add(i.getRegionName());
|
||||
master.regionManager.markRegionForDeletion(i.getRegionName());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,11 @@ import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
|||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
* Abstract base class for operations that need to examine all HRegionInfo
|
||||
* objects that make up a table. (For a table, operate on each of its rows
|
||||
* in .META.) To gain the
|
||||
*/
|
||||
abstract class TableOperation implements HConstants {
|
||||
static final Long ZERO_L = Long.valueOf(0L);
|
||||
|
||||
|
@ -58,31 +63,18 @@ abstract class TableOperation implements HConstants {
|
|||
throw new MasterNotRunningException();
|
||||
}
|
||||
|
||||
this.metaRegions = new HashSet<MetaRegion>();
|
||||
this.tableName = tableName;
|
||||
this.unservedRegions = new HashSet<HRegionInfo>();
|
||||
|
||||
// We can not access any meta region if they have not already been
|
||||
// assigned and scanned.
|
||||
|
||||
if (this.master.metaScannerThread.waitForMetaRegionsOrClose()) {
|
||||
throw new MasterNotRunningException(); // We're shutting down. Forget it.
|
||||
if (master.regionManager.metaScannerThread.waitForMetaRegionsOrClose()) {
|
||||
// We're shutting down. Forget it.
|
||||
throw new MasterNotRunningException();
|
||||
}
|
||||
|
||||
Text firstMetaRegion = null;
|
||||
synchronized (this.master.onlineMetaRegions) {
|
||||
if (this.master.onlineMetaRegions.size() == 1) {
|
||||
firstMetaRegion = this.master.onlineMetaRegions.firstKey();
|
||||
|
||||
} else if (this.master.onlineMetaRegions.containsKey(tableName)) {
|
||||
firstMetaRegion = tableName;
|
||||
|
||||
} else {
|
||||
firstMetaRegion = this.master.onlineMetaRegions.headMap(tableName).lastKey();
|
||||
}
|
||||
this.metaRegions.addAll(this.master.onlineMetaRegions.tailMap(
|
||||
firstMetaRegion).values());
|
||||
}
|
||||
this.metaRegions = master.regionManager.getMetaRegionsForTable(tableName);
|
||||
}
|
||||
|
||||
void process() throws IOException {
|
||||
|
@ -90,19 +82,16 @@ abstract class TableOperation implements HConstants {
|
|||
boolean tableExists = false;
|
||||
try {
|
||||
// Prevent meta scanner from running
|
||||
synchronized(this.master.metaScannerLock) {
|
||||
synchronized(master.regionManager.metaScannerThread.scannerLock) {
|
||||
for (MetaRegion m: metaRegions) {
|
||||
|
||||
// Get a connection to a meta server
|
||||
|
||||
HRegionInterface server =
|
||||
this.master.connection.getHRegionConnection(m.getServer());
|
||||
master.connection.getHRegionConnection(m.getServer());
|
||||
|
||||
// Open a scanner on the meta region
|
||||
|
||||
long scannerId =
|
||||
server.openScanner(m.getRegionName(), COLUMN_FAMILY_ARRAY,
|
||||
tableName, System.currentTimeMillis(), null);
|
||||
tableName, System.currentTimeMillis(), null);
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
|
@ -166,7 +155,7 @@ abstract class TableOperation implements HConstants {
|
|||
protected boolean isBeingServed(String serverName, long startCode) {
|
||||
boolean result = false;
|
||||
if (serverName != null && serverName.length() > 0 && startCode != -1L) {
|
||||
HServerInfo s = this.master.serversToServerInfo.get(serverName);
|
||||
HServerInfo s = master.serverManager.getServerInfo(serverName);
|
||||
result = s != null && s.getStartCode() == startCode;
|
||||
}
|
||||
return result;
|
||||
|
|
|
@ -68,7 +68,17 @@ public class SoftSortedMap<K,V> implements SortedMap<K,V> {
|
|||
public V get(Object key) {
|
||||
checkReferences();
|
||||
SoftValue<K,V> value = internalMap.get(key);
|
||||
return value == null ? null : value.get();
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
} else {
|
||||
if (value.get() == null) {
|
||||
internalMap.remove(key);
|
||||
return null;
|
||||
} else {
|
||||
return value.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public V remove(Object key) {
|
||||
|
@ -83,8 +93,9 @@ public class SoftSortedMap<K,V> implements SortedMap<K,V> {
|
|||
}
|
||||
|
||||
public boolean containsValue(Object value) {
|
||||
checkReferences();
|
||||
return internalMap.containsValue(value);
|
||||
/* checkReferences();
|
||||
return internalMap.containsValue(value);*/
|
||||
throw new UnsupportedOperationException("Don't support containsValue!");
|
||||
}
|
||||
|
||||
public K firstKey() {
|
||||
|
|
|
@ -35,20 +35,20 @@ public class Threads {
|
|||
* @return Returns the passed Thread <code>t</code>.
|
||||
*/
|
||||
public static Thread setDaemonThreadRunning(final Thread t,
|
||||
final String name) {
|
||||
final String name) {
|
||||
return setDaemonThreadRunning(t, name, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method that sets name, daemon status and starts passed thread.
|
||||
* @param t
|
||||
* @param name
|
||||
* @param handler A handler to set on the thread. Pass null if want to
|
||||
* use default handler.
|
||||
* @return Returns the passed Thread <code>t</code>.
|
||||
*/
|
||||
public static Thread setDaemonThreadRunning(final Thread t,
|
||||
final String name, final UncaughtExceptionHandler handler) {
|
||||
/**
|
||||
* Utility method that sets name, daemon status and starts passed thread.
|
||||
* @param t
|
||||
* @param name
|
||||
* @param handler A handler to set on the thread. Pass null if want to
|
||||
* use default handler.
|
||||
* @return Returns the passed Thread <code>t</code>.
|
||||
*/
|
||||
public static Thread setDaemonThreadRunning(final Thread t,
|
||||
final String name, final UncaughtExceptionHandler handler) {
|
||||
t.setName(name);
|
||||
if (handler != null) {
|
||||
t.setUncaughtExceptionHandler(handler);
|
||||
|
|
|
@ -147,7 +147,7 @@ public class MultiRegionTable extends HBaseTestCase {
|
|||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA));
|
||||
HRegionInfo splitB =
|
||||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB));
|
||||
assertTrue(fs.exists(parentDir));
|
||||
assertTrue("parentDir should exist", fs.exists(parentDir));
|
||||
LOG.info("Split happened. Parent is " + parent.getRegionName());
|
||||
|
||||
// Recalibrate will cause us to wait on new regions' deployment
|
||||
|
|
Loading…
Reference in New Issue