HBASE-4455 Rolling restart RSs scenario, -ROOT-, .META. regions are lost in

AssignmentManager (Ming Ma)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1176613 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-09-27 20:49:04 +00:00
parent 9bfdd1d91c
commit 7416d6015b
16 changed files with 258 additions and 144 deletions

View File

@ -307,6 +307,8 @@ Release 0.92.0 - Unreleased
HBASE-4468 Wrong resource name in an error massage: webapps instead of
hbase-webapps (nkeywal)
HBASE-4472 MiniHBaseCluster.shutdown() doesn't work if no active master
HBASE-4455 Rolling restart RSs scenario, -ROOT-, .META. regions are lost in
AssignmentManager (Ming Ma)
TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -58,7 +58,7 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
* @return Server name or null if timed out.
*/
public ServerName getMasterAddress() {
byte [] data = super.getData();
byte [] data = super.getData(false);
return data == null ? null : new ServerName(Bytes.toString(data));
}
@ -67,21 +67,7 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
* @return true if there is a master set, false if not.
*/
public boolean hasMaster() {
return super.getData() != null;
return super.getData(false) != null;
}
/**
* Get the address of the current master. If no master is available, method
* will block until one is available, the thread is interrupted, or timeout
* has passed.
*
* @param timeout maximum time to wait for master in millis, 0 for forever
* @return String of master host and port or null if timed out.
* @throws InterruptedException if the thread is interrupted while waiting
*/
public synchronized ServerName waitForMaster(long timeout)
throws InterruptedException {
byte [] data = super.blockUntilAvailable();
return data == null ? null : new ServerName(Bytes.toString(data));
}
}

View File

@ -213,8 +213,7 @@ public class CatalogTracker {
* for up to the specified timeout if not immediately available. Returns null
* if the timeout elapses before root is available.
* @param timeout maximum time to wait for root availability, in milliseconds
* @return Location of server hosting root region,
* or null if none available
* @return Location of server hosting root region or null if none available
* @throws InterruptedException if interrupted while waiting
* @throws NotAllMetaRegionsOnlineException if root not available before
* timeout
@ -269,9 +268,6 @@ public class CatalogTracker {
private HRegionInterface getRootServerConnection()
throws IOException, InterruptedException {
ServerName sn = this.rootRegionTracker.getRootRegionLocation();
if (sn == null) {
return null;
}
return getCachedConnection(sn);
}
@ -291,14 +287,11 @@ public class CatalogTracker {
* @throws IOException
* @throws InterruptedException
*/
private HRegionInterface getMetaServerConnection(boolean refresh)
private HRegionInterface getMetaServerConnection()
throws IOException, InterruptedException {
synchronized (metaAvailable) {
if (metaAvailable.get()) {
HRegionInterface current = getCachedConnection(metaLocation);
if (!refresh) {
return current;
}
HRegionInterface current = getCachedConnection(this.metaLocation);
if (verifyRegionLocation(current, this.metaLocation, META_REGION)) {
return current;
}
@ -306,17 +299,22 @@ public class CatalogTracker {
}
HRegionInterface rootConnection = getRootServerConnection();
if (rootConnection == null) {
LOG.debug("-ROOT- server unavailable.");
return null;
}
ServerName newLocation = MetaReader.readMetaLocation(rootConnection);
if (newLocation == null) {
LOG.debug(".META. server unavailable.");
return null;
}
HRegionInterface newConnection = getCachedConnection(newLocation);
if (verifyRegionLocation(newConnection, this.metaLocation, META_REGION)) {
if (verifyRegionLocation(newConnection, newLocation, META_REGION)) {
setMetaLocation(newLocation);
return newConnection;
} else {
LOG.debug("new .META. server: " + newLocation + " isn't valid." +
" Cached .META. server: " + this.metaLocation);
}
return null;
}
@ -339,8 +337,8 @@ public class CatalogTracker {
* Gets the current location for <code>.META.</code> if available and waits
* for up to the specified timeout if not immediately available. Throws an
* exception if timed out waiting. This method differs from {@link #waitForMeta()}
* in that it will go ahead and verify the location gotten from ZooKeeper by
* trying to use returned connection.
* in that it will go ahead and verify the location gotten from ZooKeeper and
* -ROOT- region by trying to use returned connection.
* @param timeout maximum time to wait for meta availability, in milliseconds
* @return location of meta
* @throws InterruptedException if interrupted while waiting
@ -351,15 +349,16 @@ public class CatalogTracker {
public ServerName waitForMeta(long timeout)
throws InterruptedException, IOException, NotAllMetaRegionsOnlineException {
long stop = System.currentTimeMillis() + timeout;
long waitTime = Math.min(500, timeout);
synchronized (metaAvailable) {
while(!stopped && !metaAvailable.get() &&
(timeout == 0 || System.currentTimeMillis() < stop)) {
if (getMetaServerConnection(true) != null) {
while(!stopped && (timeout == 0 || System.currentTimeMillis() < stop)) {
if (getMetaServerConnection() != null) {
return metaLocation;
}
metaAvailable.wait(timeout == 0 ? 50 : timeout);
// perhaps -ROOT- region isn't available, let us wait a bit and retry.
metaAvailable.wait(waitTime);
}
if (getMetaServerConnection(true) == null) {
if (getMetaServerConnection() == null) {
throw new NotAllMetaRegionsOnlineException(
"Timed out (" + timeout + "ms)");
}
@ -399,11 +398,13 @@ public class CatalogTracker {
}
private void resetMetaLocation() {
LOG.info("Current cached META location is not valid, resetting");
LOG.debug("Current cached META location: " + metaLocation +
" is not valid, resetting");
this.metaAvailable.set(false);
}
private void setMetaLocation(final ServerName metaLocation) {
LOG.debug("set new cached META location: " + metaLocation);
metaAvailable.set(true);
this.metaLocation = metaLocation;
// no synchronization because these are private and already under lock
@ -412,6 +413,9 @@ public class CatalogTracker {
private HRegionInterface getCachedConnection(ServerName sn)
throws IOException {
if (sn == null) {
return null;
}
HRegionInterface protocol = null;
try {
protocol = connection.getHRegionConnection(sn.getHostname(), sn.getPort());
@ -491,9 +495,6 @@ public class CatalogTracker {
// Pass
} catch (ServerNotRunningYetException e) {
// Pass -- remote server is not up so can't be carrying root
} catch (IOException e) {
// Unexpected exception
throw e;
}
return (connection == null)? false:
verifyRegionLocation(connection,
@ -511,7 +512,15 @@ public class CatalogTracker {
*/
public boolean verifyMetaRegionLocation(final long timeout)
throws InterruptedException, IOException {
return getMetaServerConnection(true) != null;
HRegionInterface connection = null;
try {
connection = waitForMetaServerConnection(timeout);
} catch (NotAllMetaRegionsOnlineException e) {
// Pass
} catch (ServerNotRunningYetException e) {
// Pass -- remote server is not up so can't be carrying .META.
}
return connection != null;
}
MetaNodeTracker getMetaNodeTracker() {

View File

@ -606,15 +606,15 @@ public class AssignmentManager extends ZooKeeperListener {
handleHBCK(data);
return;
}
String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
// Verify this is a known server
if (!serverManager.isServerOnline(sn) &&
!this.master.getServerName().equals(sn)) {
LOG.warn("Attempted to handle region transition for server but " +
"server is not online: " + Bytes.toString(data.getRegionName()));
"server is not online: " + prettyPrintedRegionName);
return;
}
String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
// Printing if the event was created a long time ago helps debugging
boolean lateEvent = data.getStamp() <
(System.currentTimeMillis() - 15000);
@ -1427,6 +1427,9 @@ public class AssignmentManager extends ZooKeeperListener {
// Remove region from in-memory transition and unassigned node from ZK
// While trying to enable the table the regions of the table were
// already enabled.
debugLog(state.getRegion(),
"ALREADY_OPENED region " + state.getRegion().getRegionNameAsString() +
" to " + plan.getDestination().toString());
String encodedRegionName = state.getRegion()
.getEncodedName();
try {
@ -2527,6 +2530,53 @@ public class AssignmentManager extends ZooKeeperListener {
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
}
public boolean isCarryingRoot(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
}
public boolean isCarryingMeta(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
}
/**
* Check if the shutdown server carries the specific region.
* We have a bunch of places that store region location
* Those values aren't consistent. There is a delay of notification.
* The location from zookeeper unassigned node has the most recent data;
* but the node could be deleted after the region is opened by AM.
* The AM's info could be old when OpenedRegionHandler
* processing hasn't finished yet when server shutdown occurs.
* @return whether the serverName currently hosts the region
*/
public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
RegionTransitionData data = null;
try {
data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName());
} catch (KeeperException e) {
master.abort("Unexpected ZK exception reading unassigned node for region="
+ hri.getEncodedName(), e);
}
ServerName addressFromZK = (data != null && data.getOrigin() != null) ?
data.getOrigin() : null;
if (addressFromZK != null) {
// if we get something from ZK, we will use the data
boolean matchZK = (addressFromZK != null &&
addressFromZK.equals(serverName));
LOG.debug("based on ZK, current region=" + hri.getRegionNameAsString() +
" is on server=" + addressFromZK +
" server being checked=: " + serverName);
return matchZK;
}
ServerName addressFromAM = getRegionServerOfRegion(hri);
boolean matchAM = (addressFromAM != null &&
addressFromAM.equals(serverName));
LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
" is on server=" + (addressFromAM != null ? addressFromAM : "null") +
" server being checked: " + serverName);
return matchAM;
}
/**
* Process shutdown server removing any assignments.
* @param sn Server that went down.

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
@ -170,7 +169,7 @@ public class ServerManager {
LOG.info(message);
if (existingServer.getStartcode() < serverName.getStartcode()) {
LOG.info("Triggering server recovery; existingServer " +
existingServer + " looks stale");
existingServer + " looks stale, new server:" + serverName);
expireServer(existingServer);
}
throw new PleaseHoldException(message);
@ -219,8 +218,8 @@ public class ServerManager {
if (this.deadservers.cleanPreviousInstance(serverName)) {
// This server has now become alive after we marked it as dead.
// We removed it's previous entry from the dead list to reflect it.
LOG.debug("Server " + serverName + " came back up, removed it from the" +
" dead servers list");
LOG.debug(what + ":" + " Server " + serverName + " came back up," +
" removed it from the dead servers list");
}
}
@ -353,30 +352,15 @@ public class ServerManager {
}
return;
}
CatalogTracker ct = this.master.getCatalogTracker();
// Was this server carrying root?
boolean carryingRoot;
try {
ServerName address = ct.getRootLocation();
carryingRoot = address != null && address.equals(serverName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted");
return;
}
// Was this server carrying meta? Can't ask CatalogTracker because it
// may have reset the meta location as null already (it may have already
// run into fact that meta is dead). I can ask assignment manager. It
// has an inmemory list of who has what. This list will be cleared as we
// process the dead server but should be find asking it now.
ServerName address = ct.getMetaLocation();
boolean carryingMeta = address != null && address.equals(serverName);
boolean carryingRoot = services.getAssignmentManager().isCarryingRoot(serverName);
boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
if (carryingRoot || carryingMeta) {
this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
this.services, this.deadservers, serverName, carryingRoot, carryingMeta));
} else {
this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
this.services, this.deadservers, serverName));
this.services, this.deadservers, serverName, true));
}
LOG.debug("Added=" + serverName +
" to dead servers, submitted shutdown handler to be executed, root=" +

View File

@ -36,7 +36,8 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
final boolean carryingRoot, final boolean carryingMeta) {
super(server, services, deadServers, serverName, EventType.M_META_SERVER_SHUTDOWN);
super(server, services, deadServers, serverName,
EventType.M_META_SERVER_SHUTDOWN, true);
this.carryingRoot = carryingRoot;
this.carryingMeta = carryingMeta;
}

View File

@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.master.handler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
@ -90,20 +91,23 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
@Override
public void process() {
debugLog(regionInfo, "Handling OPENED event for " + this.regionInfo.getEncodedName() +
"; deleting unassigned node");
debugLog(regionInfo, "Handling OPENED event for " +
this.regionInfo.getRegionNameAsString() + " from " + this.sn.toString()
+ "; deleting unassigned node");
// Remove region from in-memory transition and unassigned node from ZK
try {
ZKAssign.deleteOpenedNode(server.getZooKeeper(),
regionInfo.getEncodedName());
} catch (KeeperException e) {
server.abort("Error deleting OPENED node in ZK for transition ZK node (" +
regionInfo.getEncodedName() + ")", e);
server.abort("Error deleting OPENED node in ZK for transition ZK node ("
+ regionInfo.getRegionNameAsString() + ")", e);
}
// Code to defend against case where we get SPLIT before region open
// processing completes; temporary till we make SPLITs go via zk -- 0.92.
if (this.assignmentManager.isRegionInTransition(regionInfo) != null) {
this.assignmentManager.regionOnline(regionInfo, this.sn);
debugLog(regionInfo, "region online: "
+ regionInfo.getRegionNameAsString() + " on " + this.sn.toString());
} else {
LOG.warn("Skipping the onlining of " + regionInfo.getRegionNameAsString() +
" because regions is NOT in RIT -- presuming this is because it SPLIT");

View File

@ -54,14 +54,18 @@ public class ServerShutdownHandler extends EventHandler {
private final ServerName serverName;
private final MasterServices services;
private final DeadServer deadServers;
private final boolean shouldSplitHlog; // whether to split HLog or not
public ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName) {
this(server, services, deadServers, serverName, EventType.M_SERVER_SHUTDOWN);
final DeadServer deadServers, final ServerName serverName,
final boolean shouldSplitHlog) {
this(server, services, deadServers, serverName, EventType.M_SERVER_SHUTDOWN,
shouldSplitHlog);
}
ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName, EventType type) {
final DeadServer deadServers, final ServerName serverName, EventType type,
final boolean shouldSplitHlog) {
super(server, type);
this.serverName = serverName;
this.server = server;
@ -70,6 +74,7 @@ public class ServerShutdownHandler extends EventHandler {
if (!this.deadServers.contains(this.serverName)) {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
this.shouldSplitHlog = shouldSplitHlog;
}
@Override
@ -165,9 +170,54 @@ public class ServerShutdownHandler extends EventHandler {
public void process() throws IOException {
final ServerName serverName = this.serverName;
LOG.info("Splitting logs for " + serverName);
try {
if ( this.shouldSplitHlog ) {
LOG.info("Splitting logs for " + serverName);
this.services.getMasterFileSystem().splitLog(serverName);
}
// Assign root and meta if we were carrying them.
if (isCarryingRoot()) { // -ROOT-
LOG.info("Server " + serverName +
" was carrying ROOT. Trying to assign.");
this.services.getAssignmentManager().
regionOffline(HRegionInfo.ROOT_REGIONINFO);
verifyAndAssignRootWithRetries();
}
// Carrying meta?
if (isCarryingMeta()) {
LOG.info("Server " + serverName +
" was carrying META. Trying to assign.");
this.services.getAssignmentManager().
regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
this.services.getAssignmentManager().assignMeta();
}
// We don't want worker thread in the MetaServerShutdownHandler
// executor pool to block by waiting availability of -ROOT-
// and .META. server. Otherwise, it could run into the following issue:
// 1. The current MetaServerShutdownHandler instance For RS1 waits for the .META.
// to come online.
// 2. The newly assigned .META. region server RS2 was shutdown right after
// it opens the .META. region. So the MetaServerShutdownHandler
// instance For RS1 will still be blocked.
// 3. The new instance of MetaServerShutdownHandler for RS2 is queued.
// 4. The newly assigned .META. region server RS3 was shutdown right after
// it opens the .META. region. So the MetaServerShutdownHandler
// instance For RS1 and RS2 will still be blocked.
// 5. The new instance of MetaServerShutdownHandler for RS3 is queued.
// 6. Repeat until we run out of MetaServerShutdownHandler worker threads
// The solution here is to resubmit a ServerShutdownHandler request to process
// user regions on that server so that MetaServerShutdownHandler
// executor pool is always available.
if (isCarryingRoot() || isCarryingMeta()) { // -ROOT- or .META.
this.services.getExecutorService().submit(new ServerShutdownHandler(
this.server, this.services, this.deadServers, serverName, false));
this.deadServers.add(serverName);
return;
}
// Clean out anything in regions in transition. Being conservative and
// doing after log splitting. Could do some states before -- OPENING?
@ -177,18 +227,6 @@ public class ServerShutdownHandler extends EventHandler {
this.services.getAssignmentManager()
.processServerShutdown(this.serverName);
// Assign root and meta if we were carrying them.
if (isCarryingRoot()) { // -ROOT-
LOG.info("Server " + serverName +
" was carrying ROOT. Trying to assign.");
verifyAndAssignRootWithRetries();
}
// Carrying meta?
if (isCarryingMeta()) {
LOG.info("Server " + serverName + " was carrying META. Trying to assign.");
this.services.getAssignmentManager().assignMeta();
}
// Wait on meta to come online; we need it to progress.
// TODO: Best way to hold strictly here? We should build this retry logic
@ -225,7 +263,8 @@ public class ServerShutdownHandler extends EventHandler {
for (RegionState rit : regionsInTransition) {
if (!rit.isClosing() && !rit.isPendingClose()) {
LOG.debug("Removed " + rit.getRegion().getRegionNameAsString() +
" from list of regions to assign because in RIT");
" from list of regions to assign because in RIT" + " region state: "
+ rit.getState());
hris.remove(rit.getRegion());
}
}

View File

@ -571,7 +571,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
LOG.error(errorMsg);
abort(errorMsg);
}
while (tracker.blockUntilAvailable(this.msgInterval) == null) {
while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
if (this.stopped) {
throw new IOException("Received the shutdown message while waiting.");
}
@ -715,7 +715,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Interrupt catalog tracker here in case any regions being opened out in
// handlers are stuck waiting on meta or root.
if (this.catalogTracker != null) this.catalogTracker.stop();
if (this.fsOk) waitOnAllRegionsToClose(abortRequested);
if (this.fsOk) {
waitOnAllRegionsToClose(abortRequested);
LOG.info("stopping server " + this.serverNameFromMasterPOV +
"; all regions closed.");
}
// Make sure the proxy is down.
if (this.hbaseMaster != null) {
@ -729,6 +733,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
LOG.warn("Failed deleting my ephemeral node", e);
}
this.zooKeeper.close();
LOG.info("stopping server " + this.serverNameFromMasterPOV +
"; zookeeper connection closed.");
if (!killed) {
join();
}

View File

@ -136,7 +136,15 @@ public class CloseRegionHandler extends EventHandler {
this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName());
if (this.zk) setClosedState(expectedVersion, region);
if (this.zk) {
if (setClosedState(expectedVersion, region)) {
LOG.debug("set region closed state in zk successfully for region " +
name + " sn name: " + this.server.getServerName());
} else {
LOG.debug("set region closed state in zk unsuccessfully for region " +
name + " sn name: " + this.server.getServerName());
}
}
// Done! Region is closed on this RS
LOG.debug("Closed region " + region.getRegionNameAsString());
@ -149,8 +157,9 @@ public class CloseRegionHandler extends EventHandler {
/**
* Transition ZK node to CLOSED
* @param expectedVersion
* @return If the state is set successfully
*/
private void setClosedState(final int expectedVersion, final HRegion region) {
private boolean setClosedState(final int expectedVersion, final HRegion region) {
try {
if (ZKAssign.transitionNodeClosed(server.getZooKeeper(), regionInfo,
server.getServerName(), expectedVersion) == FAILED) {
@ -158,18 +167,20 @@ public class CloseRegionHandler extends EventHandler {
" CLOSING to CLOSED got a version mismatch, someone else clashed " +
"so now unassigning");
region.close();
return;
return false;
}
} catch (NullPointerException e) {
// I've seen NPE when table was deleted while close was running in unit tests.
LOG.warn("NPE during close -- catching and continuing...", e);
return false;
} catch (KeeperException e) {
LOG.error("Failed transitioning node from CLOSING to CLOSED", e);
return;
return false;
} catch (IOException e) {
LOG.error("Failed to close region after failing to transition", e);
return;
return false;
}
return true;
}
/**

View File

@ -134,7 +134,8 @@ public class OpenRegionHandler extends EventHandler {
}
// Done! Successful region open
LOG.debug("Opened " + name);
LOG.debug("Opened " + name + " on server:" +
this.server.getServerName());
} finally {
this.rsServices.getRegionsInTransitionInRS().
remove(this.regionInfo.getEncodedNameAsBytes());
@ -273,8 +274,11 @@ public class OpenRegionHandler extends EventHandler {
LOG.warn("Completed the OPEN of region " + name +
" but when transitioning from " +
" OPENING to OPENED got a version mismatch, someone else clashed " +
"so now unassigning -- closing region");
"so now unassigning -- closing region on server: " +
this.server.getServerName());
} else {
LOG.debug("region transitioned to opened in zookeeper: " +
r.getRegionInfo() + ", server: " + this.server.getServerName());
result = true;
}
} catch (KeeperException e) {

View File

@ -399,7 +399,7 @@ public class ReplicationZookeeper {
* @throws KeeperException
*/
public boolean getReplication() throws KeeperException {
byte [] data = this.statusTracker.getData();
byte [] data = this.statusTracker.getData(false);
if (data == null || data.length == 0) {
setReplicating(true);
return true;

View File

@ -53,7 +53,7 @@ public class ClusterStatusTracker extends ZooKeeperNodeTracker {
* @return true if root region location is available, false if not
*/
public boolean isClusterUp() {
return super.getData() != null;
return super.getData(false) != null;
}
/**

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Tracks the root region server location node in zookeeper.
@ -49,7 +50,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
* @return true if root region location is available, false if not
*/
public boolean isLocationAvailable() {
return super.getData() != null;
return super.getData(true) != null;
}
/**
@ -58,12 +59,14 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
* @throws InterruptedException
*/
public ServerName getRootRegionLocation() throws InterruptedException {
return dataToServerName(super.getData());
return dataToServerName(super.getData(true));
}
/**
* Gets the root region location, if available, and waits for up to the
* specified timeout if not immediately available.
* Given the zookeeper notification could be delayed, we will try to
* get the latest data.
* @param timeout maximum time to wait, in millis
* @return server name for server hosting root region formatted as per
* {@link ServerName}, or null if none available
@ -77,7 +80,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
LOG.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
return dataToServerName(super.blockUntilAvailable(timeout));
return dataToServerName(super.blockUntilAvailable(timeout, true));
}
/*

View File

@ -101,7 +101,7 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
*/
public synchronized byte [] blockUntilAvailable()
throws InterruptedException {
return blockUntilAvailable(0);
return blockUntilAvailable(0, false);
}
/**
@ -113,12 +113,19 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
* @return data of the node
* @throws InterruptedException if the waiting thread is interrupted
*/
public synchronized byte [] blockUntilAvailable(long timeout)
public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
throws InterruptedException {
if (timeout < 0) throw new IllegalArgumentException();
boolean notimeout = timeout == 0;
long startTime = System.currentTimeMillis();
long remaining = timeout;
if (refresh) {
try {
this.data = ZKUtil.getDataAndWatch(watcher, node);
} catch(KeeperException e) {
abortable.abort("Unexpected exception handling blockUntilAvailable", e);
}
}
while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
if (notimeout) {
wait();
@ -127,7 +134,7 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
wait(remaining);
remaining = timeout - (System.currentTimeMillis() - startTime);
}
return data;
return this.data;
}
/**
@ -136,11 +143,18 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
* <p>If the node is currently available, the most up-to-date known version of
* the data is returned. If the node is not currently available, null is
* returned.
*
* @param whether to refresh the data by calling ZK directly.
* @return data of the node, null if unavailable
*/
public synchronized byte [] getData() {
return data;
public synchronized byte [] getData(boolean refresh) {
if (refresh) {
try {
this.data = ZKUtil.getDataAndWatch(watcher, node);
} catch(KeeperException e) {
abortable.abort("Unexpected exception handling getData", e);
}
}
return this.data;
}
public String getNode() {

View File

@ -109,7 +109,7 @@ public class TestZooKeeperNodeTracker {
zk.registerListener(localTracker);
// Make sure we don't have a node
assertNull(localTracker.getData());
assertNull(localTracker.getData(false));
// Spin up a thread with another ZKNT and have it block
WaitToGetDataThread thread = new WaitToGetDataThread(zk, node);
@ -142,17 +142,17 @@ public class TestZooKeeperNodeTracker {
thread.join();
// Both trackers should have the node available with data one
assertNotNull(localTracker.getData());
assertNotNull(localTracker.getData(false));
assertNotNull(localTracker.blockUntilAvailable());
assertTrue(Bytes.equals(localTracker.getData(), dataOne));
assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
assertTrue(thread.hasData);
assertTrue(Bytes.equals(thread.tracker.getData(), dataOne));
assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
LOG.info("Successfully got data one");
// Make sure it's available and with the expected data
assertNotNull(secondTracker.getData());
assertNotNull(secondTracker.getData(false));
assertNotNull(secondTracker.blockUntilAvailable());
assertTrue(Bytes.equals(secondTracker.getData(), dataOne));
assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
LOG.info("Successfully got data one with the second tracker");
// Drop the node
@ -166,8 +166,8 @@ public class TestZooKeeperNodeTracker {
// Verify other guys don't have data
assertFalse(thread.hasData);
assertNull(secondTracker.getData());
assertNull(localTracker.getData());
assertNull(secondTracker.getData(false));
assertNull(localTracker.getData(false));
LOG.info("Successfully made unavailable");
// Create with second data
@ -178,14 +178,14 @@ public class TestZooKeeperNodeTracker {
thread.join();
// All trackers should have the node available with data two
assertNotNull(localTracker.getData());
assertNotNull(localTracker.getData(false));
assertNotNull(localTracker.blockUntilAvailable());
assertTrue(Bytes.equals(localTracker.getData(), dataTwo));
assertNotNull(secondTracker.getData());
assertTrue(Bytes.equals(localTracker.getData(false), dataTwo));
assertNotNull(secondTracker.getData(false));
assertNotNull(secondTracker.blockUntilAvailable());
assertTrue(Bytes.equals(secondTracker.getData(), dataTwo));
assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo));
assertTrue(thread.hasData);
assertTrue(Bytes.equals(thread.tracker.getData(), dataTwo));
assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo));
LOG.info("Successfully got data two on all trackers and threads");
// Change the data back to data one
@ -195,14 +195,14 @@ public class TestZooKeeperNodeTracker {
zkListener.waitForDataChange();
// All trackers should have the node available with data one
assertNotNull(localTracker.getData());
assertNotNull(localTracker.getData(false));
assertNotNull(localTracker.blockUntilAvailable());
assertTrue(Bytes.equals(localTracker.getData(), dataOne));
assertNotNull(secondTracker.getData());
assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
assertNotNull(secondTracker.getData(false));
assertNotNull(secondTracker.blockUntilAvailable());
assertTrue(Bytes.equals(secondTracker.getData(), dataOne));
assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
assertTrue(thread.hasData);
assertTrue(Bytes.equals(thread.tracker.getData(), dataOne));
assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
LOG.info("Successfully got data one following a data change on all trackers and threads");
}