HBASE-4015 Refactor the TimeoutMonitor to make it less racy -- reapply with HBASE-4015_reprepared_trunk_2.patch
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1166833 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
053493a967
commit
9f0a3193d8
|
@ -249,6 +249,10 @@ Release 0.91.0 - Unreleased
|
||||||
weak consistency (Jieshan Bean)
|
weak consistency (Jieshan Bean)
|
||||||
HBASE-4297 TableMapReduceUtil overwrites user supplied options
|
HBASE-4297 TableMapReduceUtil overwrites user supplied options
|
||||||
(Jan Lukavsky)
|
(Jan Lukavsky)
|
||||||
|
HBASE-4015 Refactor the TimeoutMonitor to make it less racy
|
||||||
|
(ramkrishna.s.vasudevan)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||||
|
|
|
@ -333,7 +333,7 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
|
||||||
* @param region
|
* @param region
|
||||||
* region to open
|
* region to open
|
||||||
* @return RegionOpeningState
|
* @return RegionOpeningState
|
||||||
* OPENED - if region opened succesfully.
|
* OPENED - if region open request was successful.
|
||||||
* ALREADY_OPENED - if the region was already opened.
|
* ALREADY_OPENED - if the region was already opened.
|
||||||
* FAILED_OPENING - if region opening failed.
|
* FAILED_OPENING - if region opening failed.
|
||||||
*
|
*
|
||||||
|
@ -341,6 +341,22 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
|
||||||
*/
|
*/
|
||||||
public RegionOpeningState openRegion(final HRegionInfo region) throws IOException;
|
public RegionOpeningState openRegion(final HRegionInfo region) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opens the specified region.
|
||||||
|
* @param region
|
||||||
|
* region to open
|
||||||
|
* @param versionOfOfflineNode
|
||||||
|
* the version of znode to compare when RS transitions the znode from
|
||||||
|
* OFFLINE state.
|
||||||
|
* @return RegionOpeningState
|
||||||
|
* OPENED - if region open request was successful.
|
||||||
|
* ALREADY_OPENED - if the region was already opened.
|
||||||
|
* FAILED_OPENING - if region opening failed.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Opens the specified regions.
|
* Opens the specified regions.
|
||||||
* @param regions regions to open
|
* @param regions regions to open
|
||||||
|
|
|
@ -159,6 +159,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
//Thread pool executor service for timeout monitor
|
||||||
|
private java.util.concurrent.ExecutorService threadPoolExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new assignment manager.
|
* Constructs a new assignment manager.
|
||||||
*
|
*
|
||||||
|
@ -170,7 +173,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public AssignmentManager(Server master, ServerManager serverManager,
|
public AssignmentManager(Server master, ServerManager serverManager,
|
||||||
CatalogTracker catalogTracker, final ExecutorService service)
|
CatalogTracker catalogTracker, final ExecutorService service,
|
||||||
|
final java.util.concurrent.ExecutorService threadPoolExecutorService)
|
||||||
throws KeeperException, IOException {
|
throws KeeperException, IOException {
|
||||||
super(master.getZooKeeper());
|
super(master.getZooKeeper());
|
||||||
this.master = master;
|
this.master = master;
|
||||||
|
@ -190,6 +194,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
this.maximumAssignmentAttempts =
|
this.maximumAssignmentAttempts =
|
||||||
this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
|
this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
|
||||||
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
||||||
|
this.threadPoolExecutorService = threadPoolExecutorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -475,9 +480,20 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
// Just insert region into RIT
|
// Just insert region into RIT
|
||||||
// If this never updates the timeout will trigger new assignment
|
// If this never updates the timeout will trigger new assignment
|
||||||
|
if (regionInfo.isMetaRegion() || regionInfo.isRootRegion()) {
|
||||||
regionsInTransition.put(encodedRegionName, new RegionState(
|
regionsInTransition.put(encodedRegionName, new RegionState(
|
||||||
regionInfo, RegionState.State.OPENING,
|
regionInfo, RegionState.State.OPENING, data.getStamp(), data
|
||||||
data.getStamp(), data.getOrigin()));
|
.getOrigin()));
|
||||||
|
// If ROOT or .META. table is waiting for timeout monitor to assign
|
||||||
|
// it may take lot of time when the assignment.timeout.period is
|
||||||
|
// the default value which may be very long. We will not be able
|
||||||
|
// to serve any request during this time.
|
||||||
|
// So we will assign the ROOT and .META. region immediately.
|
||||||
|
processOpeningState(regionInfo);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
|
||||||
|
RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case RS_ZK_REGION_OPENED:
|
case RS_ZK_REGION_OPENED:
|
||||||
|
@ -1109,12 +1125,21 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
public void assign(HRegionInfo region, boolean setOfflineInZK,
|
public void assign(HRegionInfo region, boolean setOfflineInZK,
|
||||||
boolean forceNewPlan) {
|
boolean forceNewPlan) {
|
||||||
String tableName = region.getTableNameAsString();
|
assign(region, setOfflineInZK, forceNewPlan, false);
|
||||||
boolean disabled = this.zkTable.isDisabledTable(tableName);
|
}
|
||||||
if (disabled || this.zkTable.isDisablingTable(tableName)) {
|
|
||||||
LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") +
|
/**
|
||||||
" skipping assign of " + region.getRegionNameAsString());
|
* @param region
|
||||||
offlineDisabledRegion(region);
|
* @param setOfflineInZK
|
||||||
|
* @param forceNewPlan
|
||||||
|
* @param reassign
|
||||||
|
* - true if timeout monitor calls assign
|
||||||
|
*/
|
||||||
|
public void assign(HRegionInfo region, boolean setOfflineInZK,
|
||||||
|
boolean forceNewPlan, boolean reassign) {
|
||||||
|
//If reassign is true do not call disableRegionIfInRIT as
|
||||||
|
// we have not yet moved the znode to OFFLINE state.
|
||||||
|
if (!reassign && disableRegionIfInRIT(region)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (this.serverManager.isClusterShutdown()) {
|
if (this.serverManager.isClusterShutdown()) {
|
||||||
|
@ -1122,9 +1147,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
region.getRegionNameAsString());
|
region.getRegionNameAsString());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
RegionState state = addToRegionsInTransition(region);
|
RegionState state = addToRegionsInTransition(region,
|
||||||
|
reassign);
|
||||||
synchronized (state) {
|
synchronized (state) {
|
||||||
assign(state, setOfflineInZK, forceNewPlan);
|
assign(region, state, setOfflineInZK, forceNewPlan, reassign);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1282,11 +1308,19 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
* @return The current RegionState
|
* @return The current RegionState
|
||||||
*/
|
*/
|
||||||
private RegionState addToRegionsInTransition(final HRegionInfo region) {
|
private RegionState addToRegionsInTransition(final HRegionInfo region) {
|
||||||
|
return addToRegionsInTransition(region, false);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @param region
|
||||||
|
* @param reassign
|
||||||
|
* @return The current RegionState
|
||||||
|
*/
|
||||||
|
private RegionState addToRegionsInTransition(final HRegionInfo region,
|
||||||
|
boolean reassign) {
|
||||||
synchronized (regionsInTransition) {
|
synchronized (regionsInTransition) {
|
||||||
return forceRegionStateToOffline(region);
|
return forceRegionStateToOffline(region, reassign);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
|
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
|
||||||
* Caller must hold lock on this.regionsInTransition.
|
* Caller must hold lock on this.regionsInTransition.
|
||||||
|
@ -1294,15 +1328,35 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
* @return Amended RegionState.
|
* @return Amended RegionState.
|
||||||
*/
|
*/
|
||||||
private RegionState forceRegionStateToOffline(final HRegionInfo region) {
|
private RegionState forceRegionStateToOffline(final HRegionInfo region) {
|
||||||
|
return forceRegionStateToOffline(region, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
|
||||||
|
* Caller must hold lock on this.regionsInTransition.
|
||||||
|
* @param region
|
||||||
|
* @param reassign
|
||||||
|
* @return Amended RegionState.
|
||||||
|
*/
|
||||||
|
private RegionState forceRegionStateToOffline(final HRegionInfo region,
|
||||||
|
boolean reassign) {
|
||||||
String encodedName = region.getEncodedName();
|
String encodedName = region.getEncodedName();
|
||||||
RegionState state = this.regionsInTransition.get(encodedName);
|
RegionState state = this.regionsInTransition.get(encodedName);
|
||||||
if (state == null) {
|
if (state == null) {
|
||||||
state = new RegionState(region, RegionState.State.OFFLINE);
|
state = new RegionState(region, RegionState.State.OFFLINE);
|
||||||
this.regionsInTransition.put(encodedName, state);
|
this.regionsInTransition.put(encodedName, state);
|
||||||
} else {
|
} else {
|
||||||
|
// If invoked from timeout monitor do not force in-memory to OFFLINE.
|
||||||
|
// Based on the
|
||||||
|
// znode state we will decide if to change in-memory state to OFFLINE or
|
||||||
|
// not. It
|
||||||
|
// will
|
||||||
|
// be done before setting the znode to OFFLINE state.
|
||||||
|
if (!reassign) {
|
||||||
LOG.debug("Forcing OFFLINE; was=" + state);
|
LOG.debug("Forcing OFFLINE; was=" + state);
|
||||||
state.update(RegionState.State.OFFLINE);
|
state.update(RegionState.State.OFFLINE);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1311,11 +1365,29 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
* @param state
|
* @param state
|
||||||
* @param setOfflineInZK
|
* @param setOfflineInZK
|
||||||
* @param forceNewPlan
|
* @param forceNewPlan
|
||||||
|
* @param reassign
|
||||||
*/
|
*/
|
||||||
private void assign(final RegionState state, final boolean setOfflineInZK,
|
private void assign(final HRegionInfo region, final RegionState state,
|
||||||
final boolean forceNewPlan) {
|
final boolean setOfflineInZK, final boolean forceNewPlan,
|
||||||
|
boolean reassign) {
|
||||||
for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
|
for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
|
||||||
if (setOfflineInZK && !setOfflineInZooKeeper(state)) return;
|
int versionOfOfflineNode = -1;
|
||||||
|
if (setOfflineInZK) {
|
||||||
|
// get the version of the znode after setting it to OFFLINE.
|
||||||
|
// versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
|
||||||
|
versionOfOfflineNode = setOfflineInZooKeeper(state,
|
||||||
|
reassign);
|
||||||
|
if(versionOfOfflineNode != -1){
|
||||||
|
if (disableRegionIfInRIT(region)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (setOfflineInZK && versionOfOfflineNode == -1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (this.master.isStopped()) {
|
if (this.master.isStopped()) {
|
||||||
LOG.debug("Server stopped; skipping assign of " + state);
|
LOG.debug("Server stopped; skipping assign of " + state);
|
||||||
return;
|
return;
|
||||||
|
@ -1334,8 +1406,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
|
state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
|
||||||
plan.getDestination());
|
plan.getDestination());
|
||||||
// Send OPEN RPC. This can fail if the server on other end is is not up.
|
// Send OPEN RPC. This can fail if the server on other end is is not up.
|
||||||
|
// Pass the version that was obtained while setting the node to OFFLINE.
|
||||||
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
|
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
|
||||||
.getDestination(), state.getRegion());
|
.getDestination(), state.getRegion(), versionOfOfflineNode);
|
||||||
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
|
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
|
||||||
// Remove region from in-memory transition and unassigned node from ZK
|
// Remove region from in-memory transition and unassigned node from ZK
|
||||||
// While trying to enable the table the regions of the table were
|
// While trying to enable the table the regions of the table were
|
||||||
|
@ -1389,31 +1462,73 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private boolean disableRegionIfInRIT(final HRegionInfo region) {
|
||||||
* Set region as OFFLINED up in zookeeper
|
String tableName = region.getTableNameAsString();
|
||||||
* @param state
|
boolean disabled = this.zkTable.isDisabledTable(tableName);
|
||||||
* @return True if we succeeded, false otherwise (State was incorrect or failed
|
if (disabled || this.zkTable.isDisablingTable(tableName)) {
|
||||||
* updating zk).
|
LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") +
|
||||||
*/
|
" skipping assign of " + region.getRegionNameAsString());
|
||||||
boolean setOfflineInZooKeeper(final RegionState state) {
|
offlineDisabledRegion(region);
|
||||||
if (!state.isClosed() && !state.isOffline()) {
|
return true;
|
||||||
this.master.abort("Unexpected state trying to OFFLINE; " + state,
|
}
|
||||||
new IllegalStateException());
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set region as OFFLINED up in zookeeper
|
||||||
|
*
|
||||||
|
* @param state
|
||||||
|
* @param reassign
|
||||||
|
* - true if comes from timeout monitor, false otherwise
|
||||||
|
* @return the version of the offline node if setting of the OFFLINE node was
|
||||||
|
* successful, -1 otherwise.
|
||||||
|
*/
|
||||||
|
int setOfflineInZooKeeper(final RegionState state,
|
||||||
|
boolean reassign) {
|
||||||
|
// If invoked from timeoutmonitor the current state in memory need not be
|
||||||
|
// OFFLINE.
|
||||||
|
if (!reassign && !state.isClosed() && !state.isOffline()) {
|
||||||
|
this.master.abort("Unexpected state trying to OFFLINE; " + state,
|
||||||
|
new IllegalStateException());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
boolean allowZNodeCreation = false;
|
||||||
|
// If reassign is true and the current state is PENDING_OPEN
|
||||||
|
// or OPENING then refresh the in-memory state to PENDING_OPEN. This is
|
||||||
|
// important because
|
||||||
|
// if timeoutmonitor deduces that a region was in RS_OPENING state for a long
|
||||||
|
// time but when the master forces
|
||||||
|
// the znode to OFFLINE state the RS could have opened
|
||||||
|
// the corresponding region and the
|
||||||
|
// state in znode will be RS_ZK_REGION_OPENED. The
|
||||||
|
// OpenedRegionHandler
|
||||||
|
// expects the in-memory state to be PENDING_OPEN or OPENING.
|
||||||
|
// For all other cases we can change the in-memory state to OFFLINE.
|
||||||
|
if (reassign &&
|
||||||
|
(state.getState().equals(RegionState.State.PENDING_OPEN) ||
|
||||||
|
state.getState().equals(RegionState.State.OPENING))) {
|
||||||
|
state.update(RegionState.State.PENDING_OPEN);
|
||||||
|
allowZNodeCreation = false;
|
||||||
|
} else {
|
||||||
state.update(RegionState.State.OFFLINE);
|
state.update(RegionState.State.OFFLINE);
|
||||||
|
allowZNodeCreation = true;
|
||||||
|
}
|
||||||
|
int versionOfOfflineNode = -1;
|
||||||
try {
|
try {
|
||||||
if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
|
// get the version after setting the znode to OFFLINE
|
||||||
state.getRegion(), this.master.getServerName())) {
|
versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
|
||||||
LOG.warn("Attempted to create/force node into OFFLINE state before " +
|
state.getRegion(), this.master.getServerName(),
|
||||||
"completing assignment but failed to do so for " + state);
|
reassign, allowZNodeCreation);
|
||||||
return false;
|
if (versionOfOfflineNode == -1) {
|
||||||
|
LOG.warn("Attempted to create/force node into OFFLINE state before "
|
||||||
|
+ "completing assignment but failed to do so for " + state);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
|
master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
|
||||||
return false;
|
return -1;
|
||||||
}
|
}
|
||||||
return true;
|
return versionOfOfflineNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2287,90 +2402,52 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
for (RegionState regionState : regionsInTransition.values()) {
|
for (RegionState regionState : regionsInTransition.values()) {
|
||||||
if (regionState.getStamp() + timeout <= now) {
|
if (regionState.getStamp() + timeout <= now) {
|
||||||
|
//decide on action upon timeout
|
||||||
|
actOnTimeOut(regionState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void actOnTimeOut(RegionState regionState) {
|
||||||
HRegionInfo regionInfo = regionState.getRegion();
|
HRegionInfo regionInfo = regionState.getRegion();
|
||||||
LOG.info("Regions in transition timed out: " + regionState);
|
LOG.info("Regions in transition timed out: " + regionState);
|
||||||
// Expired! Do a retry.
|
// Expired! Do a retry.
|
||||||
switch (regionState.getState()) {
|
switch (regionState.getState()) {
|
||||||
case CLOSED:
|
case CLOSED:
|
||||||
LOG.info("Region " + regionInfo.getEncodedName() +
|
LOG.info("Region " + regionInfo.getEncodedName()
|
||||||
" has been CLOSED for too long, waiting on queued " +
|
+ " has been CLOSED for too long, waiting on queued "
|
||||||
"ClosedRegionHandler to run or server shutdown");
|
+ "ClosedRegionHandler to run or server shutdown");
|
||||||
// Update our timestamp.
|
// Update our timestamp.
|
||||||
regionState.updateTimestampToNow();
|
regionState.updateTimestampToNow();
|
||||||
break;
|
break;
|
||||||
case OFFLINE:
|
case OFFLINE:
|
||||||
LOG.info("Region has been OFFLINE for too long, " +
|
LOG.info("Region has been OFFLINE for too long, " + "reassigning "
|
||||||
"reassigning " + regionInfo.getRegionNameAsString() +
|
+ regionInfo.getRegionNameAsString() + " to a random server");
|
||||||
" to a random server");
|
invokeTimeOutManager(regionState.getRegion(),
|
||||||
assigns.put(regionState.getRegion(), Boolean.FALSE);
|
TimeOutOperationType.ASSIGN);
|
||||||
break;
|
break;
|
||||||
case PENDING_OPEN:
|
case PENDING_OPEN:
|
||||||
LOG.info("Region has been PENDING_OPEN for too " +
|
LOG.info("Region has been PENDING_OPEN for too "
|
||||||
"long, reassigning region=" +
|
+ "long, reassigning region=" + regionInfo.getRegionNameAsString());
|
||||||
regionInfo.getRegionNameAsString());
|
invokeTimeOutManager(regionState.getRegion(),
|
||||||
assigns.put(regionState.getRegion(), Boolean.TRUE);
|
TimeOutOperationType.ASSIGN);
|
||||||
break;
|
break;
|
||||||
case OPENING:
|
case OPENING:
|
||||||
LOG.info("Region has been OPENING for too " +
|
processOpeningState(regionInfo);
|
||||||
"long, reassigning region=" +
|
|
||||||
regionInfo.getRegionNameAsString());
|
|
||||||
// Should have a ZK node in OPENING state
|
|
||||||
try {
|
|
||||||
String node = ZKAssign.getNodeName(watcher,
|
|
||||||
regionInfo.getEncodedName());
|
|
||||||
Stat stat = new Stat();
|
|
||||||
RegionTransitionData data = ZKAssign.getDataNoWatch(watcher,
|
|
||||||
node, stat);
|
|
||||||
if (data == null) {
|
|
||||||
LOG.warn("Data is null, node " + node + " no longer exists");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) {
|
|
||||||
LOG.debug("Region has transitioned to OPENED, allowing " +
|
|
||||||
"watched event handlers to process");
|
|
||||||
break;
|
|
||||||
} else if (data.getEventType() !=
|
|
||||||
EventType.RS_ZK_REGION_OPENING) {
|
|
||||||
LOG.warn("While timing out a region in state OPENING, " +
|
|
||||||
"found ZK node in unexpected state: " +
|
|
||||||
data.getEventType());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// Attempt to transition node into OFFLINE
|
|
||||||
try {
|
|
||||||
data = new RegionTransitionData(
|
|
||||||
EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(),
|
|
||||||
master.getServerName());
|
|
||||||
if (ZKUtil.setData(watcher, node, data.getBytes(),
|
|
||||||
stat.getVersion())) {
|
|
||||||
// Node is now OFFLINE, let's trigger another assignment
|
|
||||||
ZKUtil.getDataAndWatch(watcher, node); // re-set the watch
|
|
||||||
LOG.info("Successfully transitioned region=" +
|
|
||||||
regionInfo.getRegionNameAsString() + " into OFFLINE" +
|
|
||||||
" and forcing a new assignment");
|
|
||||||
assigns.put(regionState.getRegion(), Boolean.TRUE);
|
|
||||||
}
|
|
||||||
} catch (KeeperException.NoNodeException nne) {
|
|
||||||
// Node did not exist, can't time this out
|
|
||||||
}
|
|
||||||
} catch (KeeperException ke) {
|
|
||||||
LOG.error("Unexpected ZK exception timing out CLOSING region",
|
|
||||||
ke);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case OPEN:
|
case OPEN:
|
||||||
LOG.error("Region has been OPEN for too long, " +
|
LOG.error("Region has been OPEN for too long, "
|
||||||
"we don't know where region was opened so can't do anything");
|
+ "we don't know where region was opened so can't do anything");
|
||||||
synchronized (regionState) {
|
synchronized (regionState) {
|
||||||
regionState.updateTimestampToNow();
|
regionState.updateTimestampToNow();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case PENDING_CLOSE:
|
case PENDING_CLOSE:
|
||||||
LOG.info("Region has been PENDING_CLOSE for too " +
|
LOG.info("Region has been PENDING_CLOSE for too "
|
||||||
"long, running forced unassign again on region=" +
|
+ "long, running forced unassign again on region="
|
||||||
regionInfo.getRegionNameAsString());
|
+ regionInfo.getRegionNameAsString());
|
||||||
try {
|
try {
|
||||||
// If the server got the RPC, it will transition the node
|
// If the server got the RPC, it will transition the node
|
||||||
// to CLOSING, so only do something here if no node exists
|
// to CLOSING, so only do something here if no node exists
|
||||||
|
@ -2378,35 +2455,74 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
|
ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
|
||||||
// Queue running of an unassign -- do actual unassign
|
// Queue running of an unassign -- do actual unassign
|
||||||
// outside of the regionsInTransition lock.
|
// outside of the regionsInTransition lock.
|
||||||
unassigns.add(regionInfo);
|
invokeTimeOutManager(regionState.getRegion(),
|
||||||
|
TimeOutOperationType.UNASSIGN);
|
||||||
}
|
}
|
||||||
} catch (NoNodeException e) {
|
} catch (NoNodeException e) {
|
||||||
LOG.debug("Node no longer existed so not forcing another " +
|
LOG.debug("Node no longer existed so not forcing another "
|
||||||
"unassignment");
|
+ "unassignment");
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.warn("Unexpected ZK exception timing out a region " +
|
LOG.warn("Unexpected ZK exception timing out a region close", e);
|
||||||
"close", e);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CLOSING:
|
case CLOSING:
|
||||||
LOG.info("Region has been CLOSING for too " +
|
LOG.info("Region has been CLOSING for too "
|
||||||
"long, this should eventually complete or the server will " +
|
+ "long, this should eventually complete or the server will "
|
||||||
"expire, doing nothing");
|
+ "expire, doing nothing");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
// Finish the work for regions in PENDING_CLOSE state
|
|
||||||
for (HRegionInfo hri: unassigns) {
|
|
||||||
unassign(hri, true);
|
|
||||||
}
|
|
||||||
for (Map.Entry<HRegionInfo, Boolean> e: assigns.entrySet()){
|
|
||||||
assign(e.getKey(), false, e.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The type of operation that has to performed on TimeOut.
|
||||||
|
* This is not an inmemory state. Just an enum to determine whether
|
||||||
|
* the operation to be taken after timeout is to assign the region
|
||||||
|
* or unassign the region.
|
||||||
|
* ASSIGN - need to assign a region to an RS
|
||||||
|
* UNASSIGN - need to unassign a region
|
||||||
|
*/
|
||||||
|
public static enum TimeOutOperationType {
|
||||||
|
ASSIGN, UNASSIGN;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processOpeningState(HRegionInfo regionInfo) {
|
||||||
|
LOG.info("Region has been OPENING for too " + "long, reassigning region="
|
||||||
|
+ regionInfo.getRegionNameAsString());
|
||||||
|
// Should have a ZK node in OPENING state
|
||||||
|
try {
|
||||||
|
String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
|
||||||
|
Stat stat = new Stat();
|
||||||
|
RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node,
|
||||||
|
stat);
|
||||||
|
if (dataInZNode == null) {
|
||||||
|
LOG.warn("Data is null, node " + node + " no longer exists");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) {
|
||||||
|
LOG.debug("Region has transitioned to OPENED, allowing "
|
||||||
|
+ "watched event handlers to process");
|
||||||
|
return;
|
||||||
|
} else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING) {
|
||||||
|
LOG.warn("While timing out a region in state OPENING, "
|
||||||
|
+ "found ZK node in unexpected state: "
|
||||||
|
+ dataInZNode.getEventType());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// do not make the znode to OFFLINE. The timeoutManager will do it
|
||||||
|
invokeTimeOutManager(regionInfo, TimeOutOperationType.ASSIGN);
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
private void invokeTimeOutManager(HRegionInfo hri,
|
||||||
|
TimeOutOperationType operation) {
|
||||||
|
TimeOutManagerCallable timeOutManager = new TimeOutManagerCallable(this,
|
||||||
|
hri, operation);
|
||||||
|
threadPoolExecutorService.submit(timeOutManager);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Process shutdown server removing any assignments.
|
* Process shutdown server removing any assignments.
|
||||||
* @param sn Server that went down.
|
* @param sn Server that went down.
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -189,6 +190,11 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
|
|
||||||
private TableDescriptors tableDescriptors;
|
private TableDescriptors tableDescriptors;
|
||||||
|
|
||||||
|
// thread pool exceutor for timeout monitor. Passed from HMaster so that can
|
||||||
|
// be properly
|
||||||
|
// shudown.
|
||||||
|
private java.util.concurrent.ExecutorService threadPoolExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the HMaster. The steps are as follows:
|
* Initializes the HMaster. The steps are as follows:
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -361,8 +367,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
||||||
this.catalogTracker.start();
|
this.catalogTracker.start();
|
||||||
|
|
||||||
|
threadPoolExecutorService = Executors.newCachedThreadPool();
|
||||||
this.assignmentManager = new AssignmentManager(this, serverManager,
|
this.assignmentManager = new AssignmentManager(this, serverManager,
|
||||||
this.catalogTracker, this.executorService);
|
this.catalogTracker, this.executorService, threadPoolExecutorService);
|
||||||
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
||||||
zooKeeper.registerListenerFirst(assignmentManager);
|
zooKeeper.registerListenerFirst(assignmentManager);
|
||||||
|
|
||||||
|
|
|
@ -392,8 +392,11 @@ public class ServerManager {
|
||||||
* <p>
|
* <p>
|
||||||
* @param server server to open a region
|
* @param server server to open a region
|
||||||
* @param region region to open
|
* @param region region to open
|
||||||
|
* @param versionOfOfflineNode that needs to be present in the offline node
|
||||||
|
* when RS tries to change the state from OFFLINE to other states.
|
||||||
*/
|
*/
|
||||||
public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region)
|
public RegionOpeningState sendRegionOpen(final ServerName server,
|
||||||
|
HRegionInfo region, int versionOfOfflineNode)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HRegionInterface hri = getServerConnection(server);
|
HRegionInterface hri = getServerConnection(server);
|
||||||
if (hri == null) {
|
if (hri == null) {
|
||||||
|
@ -401,7 +404,11 @@ public class ServerManager {
|
||||||
" failed because no RPC connection found to this server");
|
" failed because no RPC connection found to this server");
|
||||||
return RegionOpeningState.FAILED_OPENING;
|
return RegionOpeningState.FAILED_OPENING;
|
||||||
}
|
}
|
||||||
|
if (versionOfOfflineNode == -1) {
|
||||||
return hri.openRegion(region);
|
return hri.openRegion(region);
|
||||||
|
} else {
|
||||||
|
return hri.openRegion(region, versionOfOfflineNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2011 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.master.AssignmentManager.TimeOutOperationType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A callable object that invokes the corresponding action that needs to be
|
||||||
|
* taken when timeout thread deducts a region was in tranisition for a long
|
||||||
|
* time. Implementing as future callable we are able to act on the timeout
|
||||||
|
* asynchronoulsy
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TimeOutManagerCallable implements Callable<Object> {
|
||||||
|
|
||||||
|
private AssignmentManager assignmentManager;
|
||||||
|
|
||||||
|
private HRegionInfo hri;
|
||||||
|
|
||||||
|
private TimeOutOperationType operation;
|
||||||
|
|
||||||
|
public TimeOutManagerCallable(AssignmentManager assignmentManager,
|
||||||
|
HRegionInfo hri, TimeOutOperationType operation) {
|
||||||
|
this.assignmentManager = assignmentManager;
|
||||||
|
this.hri = hri;
|
||||||
|
this.operation = operation;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object call() throws Exception {
|
||||||
|
if (TimeOutOperationType.ASSIGN.equals(operation)) {
|
||||||
|
assignmentManager.assign(hri, true, true, true);
|
||||||
|
} else {
|
||||||
|
assignmentManager.unassign(hri);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2359,6 +2359,38 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
return RegionOpeningState.OPENED;
|
return RegionOpeningState.OPENED;
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
@QosPriority(priority = HIGH_QOS)
|
||||||
|
public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
|
||||||
|
throws IOException {
|
||||||
|
checkOpen();
|
||||||
|
if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
|
||||||
|
throw new RegionAlreadyInTransitionException("open", region
|
||||||
|
.getEncodedName());
|
||||||
|
}
|
||||||
|
HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName());
|
||||||
|
if (null != onlineRegion) {
|
||||||
|
LOG.warn("Attempted open of " + region.getEncodedName()
|
||||||
|
+ " but already online on this server");
|
||||||
|
return RegionOpeningState.ALREADY_OPENED;
|
||||||
|
}
|
||||||
|
LOG.info("Received request to open region: "
|
||||||
|
+ region.getRegionNameAsString());
|
||||||
|
this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes());
|
||||||
|
HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
|
||||||
|
// Need to pass the expected version in the constructor.
|
||||||
|
if (region.isRootRegion()) {
|
||||||
|
this.service.submit(new OpenRootHandler(this, this, region, htd,
|
||||||
|
versionOfOfflineNode));
|
||||||
|
} else if (region.isMetaRegion()) {
|
||||||
|
this.service.submit(new OpenMetaHandler(this, this, region, htd,
|
||||||
|
versionOfOfflineNode));
|
||||||
|
} else {
|
||||||
|
this.service.submit(new OpenRegionHandler(this, this, region, htd,
|
||||||
|
versionOfOfflineNode));
|
||||||
|
}
|
||||||
|
return RegionOpeningState.OPENED;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@QosPriority(priority=HIGH_QOS)
|
@QosPriority(priority=HIGH_QOS)
|
||||||
|
|
|
@ -33,6 +33,12 @@ public class OpenMetaHandler extends OpenRegionHandler {
|
||||||
public OpenMetaHandler(final Server server,
|
public OpenMetaHandler(final Server server,
|
||||||
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||||
final HTableDescriptor htd) {
|
final HTableDescriptor htd) {
|
||||||
super(server,rsServices, regionInfo, htd, EventType.M_RS_OPEN_META);
|
this(server, rsServices, regionInfo, htd, -1);
|
||||||
|
}
|
||||||
|
public OpenMetaHandler(final Server server,
|
||||||
|
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||||
|
final HTableDescriptor htd, int versionOfOfflineNode) {
|
||||||
|
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META,
|
||||||
|
versionOfOfflineNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -52,20 +52,30 @@ public class OpenRegionHandler extends EventHandler {
|
||||||
// the total open. We'll fail the open if someone hijacks our znode; we can
|
// the total open. We'll fail the open if someone hijacks our znode; we can
|
||||||
// tell this has happened if version is not as expected.
|
// tell this has happened if version is not as expected.
|
||||||
private volatile int version = -1;
|
private volatile int version = -1;
|
||||||
|
//version of the offline node that was set by the master
|
||||||
|
private volatile int versionOfOfflineNode = -1;
|
||||||
|
|
||||||
public OpenRegionHandler(final Server server,
|
public OpenRegionHandler(final Server server,
|
||||||
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||||
HTableDescriptor htd) {
|
HTableDescriptor htd) {
|
||||||
this (server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION);
|
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1);
|
||||||
|
}
|
||||||
|
public OpenRegionHandler(final Server server,
|
||||||
|
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||||
|
HTableDescriptor htd, int versionOfOfflineNode) {
|
||||||
|
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
|
||||||
|
versionOfOfflineNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected OpenRegionHandler(final Server server,
|
protected OpenRegionHandler(final Server server,
|
||||||
final RegionServerServices rsServices, final HRegionInfo regionInfo,
|
final RegionServerServices rsServices, final HRegionInfo regionInfo,
|
||||||
final HTableDescriptor htd, EventType eventType) {
|
final HTableDescriptor htd, EventType eventType,
|
||||||
|
final int versionOfOfflineNode) {
|
||||||
super(server, eventType);
|
super(server, eventType);
|
||||||
this.rsServices = rsServices;
|
this.rsServices = rsServices;
|
||||||
this.regionInfo = regionInfo;
|
this.regionInfo = regionInfo;
|
||||||
this.htd = htd;
|
this.htd = htd;
|
||||||
|
this.versionOfOfflineNode = versionOfOfflineNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HRegionInfo getRegionInfo() {
|
public HRegionInfo getRegionInfo() {
|
||||||
|
@ -86,7 +96,8 @@ public class OpenRegionHandler extends EventHandler {
|
||||||
|
|
||||||
// If fails, just return. Someone stole the region from under us.
|
// If fails, just return. Someone stole the region from under us.
|
||||||
// Calling transitionZookeeperOfflineToOpening initalizes this.version.
|
// Calling transitionZookeeperOfflineToOpening initalizes this.version.
|
||||||
if (!transitionZookeeperOfflineToOpening(encodedName)) {
|
if (!transitionZookeeperOfflineToOpening(encodedName,
|
||||||
|
versionOfOfflineNode)) {
|
||||||
LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
|
LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
|
||||||
encodedName);
|
encodedName);
|
||||||
return;
|
return;
|
||||||
|
@ -325,15 +336,18 @@ public class OpenRegionHandler extends EventHandler {
|
||||||
* Transition ZK node from OFFLINE to OPENING.
|
* Transition ZK node from OFFLINE to OPENING.
|
||||||
* @param encodedName Name of the znode file (Region encodedName is the znode
|
* @param encodedName Name of the znode file (Region encodedName is the znode
|
||||||
* name).
|
* name).
|
||||||
|
* @param versionOfOfflineNode - version Of OfflineNode that needs to be compared
|
||||||
|
* before changing the node's state from OFFLINE
|
||||||
* @return True if successful transition.
|
* @return True if successful transition.
|
||||||
*/
|
*/
|
||||||
boolean transitionZookeeperOfflineToOpening(final String encodedName) {
|
boolean transitionZookeeperOfflineToOpening(final String encodedName,
|
||||||
|
int versionOfOfflineNode) {
|
||||||
// TODO: should also handle transition from CLOSED?
|
// TODO: should also handle transition from CLOSED?
|
||||||
try {
|
try {
|
||||||
// Initialize the znode version.
|
// Initialize the znode version.
|
||||||
this.version =
|
this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo,
|
||||||
ZKAssign.transitionNodeOpening(server.getZooKeeper(),
|
server.getServerName(), EventType.M_ZK_REGION_OFFLINE,
|
||||||
regionInfo, server.getServerName());
|
EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.error("Error transition from OFFLINE to OPENING for region=" +
|
LOG.error("Error transition from OFFLINE to OPENING for region=" +
|
||||||
encodedName, e);
|
encodedName, e);
|
||||||
|
|
|
@ -33,6 +33,12 @@ public class OpenRootHandler extends OpenRegionHandler {
|
||||||
public OpenRootHandler(final Server server,
|
public OpenRootHandler(final Server server,
|
||||||
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||||
final HTableDescriptor htd) {
|
final HTableDescriptor htd) {
|
||||||
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT);
|
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, -1);
|
||||||
|
}
|
||||||
|
public OpenRootHandler(final Server server,
|
||||||
|
final RegionServerServices rsServices, HRegionInfo regionInfo,
|
||||||
|
final HTableDescriptor htd, int versionOfOfflineNode) {
|
||||||
|
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT,
|
||||||
|
versionOfOfflineNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,7 +203,6 @@ public class ZKAssign {
|
||||||
ZKUtil.setData(zkw, node, data.getBytes());
|
ZKUtil.setData(zkw, node, data.getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates or force updates an unassigned node to the OFFLINE state for the
|
* Creates or force updates an unassigned node to the OFFLINE state for the
|
||||||
* specified region.
|
* specified region.
|
||||||
|
@ -222,21 +221,91 @@ public class ZKAssign {
|
||||||
* @throws KeeperException if unexpected zookeeper exception
|
* @throws KeeperException if unexpected zookeeper exception
|
||||||
* @throws KeeperException.NodeExistsException if node already exists
|
* @throws KeeperException.NodeExistsException if node already exists
|
||||||
*/
|
*/
|
||||||
public static boolean createOrForceNodeOffline(ZooKeeperWatcher zkw,
|
public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
|
||||||
HRegionInfo region, ServerName serverName)
|
HRegionInfo region, ServerName serverName) throws KeeperException {
|
||||||
|
return createOrForceNodeOffline(zkw, region, serverName, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates or force updates an unassigned node to the OFFLINE state for the
|
||||||
|
* specified region.
|
||||||
|
* <p>
|
||||||
|
* Attempts to create the node but if it exists will force it to transition to
|
||||||
|
* and OFFLINE state.
|
||||||
|
* <p>
|
||||||
|
* Sets a watcher on the unassigned region node if the method is successful.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* This method should be used when assigning a region.
|
||||||
|
*
|
||||||
|
* @param zkw
|
||||||
|
* zk reference
|
||||||
|
* @param region
|
||||||
|
* region to be created as offline
|
||||||
|
* @param serverName
|
||||||
|
* server event originates from
|
||||||
|
* @param reassign
|
||||||
|
* - true if invoked from timeout monitor, false otherwise
|
||||||
|
* @param allowCreation
|
||||||
|
* - true if the node has to be created newly, false otherwise
|
||||||
|
* @throws KeeperException
|
||||||
|
* if unexpected zookeeper exception
|
||||||
|
* @throws KeeperException.NodeExistsException
|
||||||
|
* if node already exists
|
||||||
|
*/
|
||||||
|
public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
|
||||||
|
HRegionInfo region, ServerName serverName,
|
||||||
|
boolean reassign, boolean allowCreation)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
|
LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
|
||||||
region.getEncodedName() + " with OFFLINE state"));
|
region.getEncodedName() + " with OFFLINE state"));
|
||||||
RegionTransitionData data = new RegionTransitionData(
|
RegionTransitionData data = new RegionTransitionData(
|
||||||
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
||||||
String node = getNodeName(zkw, region.getEncodedName());
|
String node = getNodeName(zkw, region.getEncodedName());
|
||||||
|
Stat stat = new Stat();
|
||||||
zkw.sync(node);
|
zkw.sync(node);
|
||||||
int version = ZKUtil.checkExists(zkw, node);
|
int version = ZKUtil.checkExists(zkw, node);
|
||||||
if (version == -1) {
|
if (version == -1) {
|
||||||
ZKUtil.createAndWatch(zkw, node, data.getBytes());
|
// If timeoutmonitor deducts a node to be in OPENING state but before it
|
||||||
|
// could
|
||||||
|
// transit to OFFLINE state if RS had opened the region then the Master
|
||||||
|
// deletes the
|
||||||
|
// assigned region znode. In that case the znode will not exist. So we
|
||||||
|
// should not
|
||||||
|
// create the znode again which will lead to double assignment.
|
||||||
|
if (reassign && !allowCreation) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return ZKUtil.createAndWatch(zkw, node, data.getBytes());
|
||||||
} else {
|
} else {
|
||||||
if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) {
|
RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region
|
||||||
return false;
|
.getEncodedName(), stat);
|
||||||
|
// Do not move the node to OFFLINE if znode is in any of the following
|
||||||
|
// state.
|
||||||
|
// Because these are already executed states.
|
||||||
|
if (reassign && null != curDataInZNode) {
|
||||||
|
EventType eventType = curDataInZNode.getEventType();
|
||||||
|
if (eventType.equals(EventType.RS_ZK_REGION_CLOSING)
|
||||||
|
|| eventType.equals(EventType.RS_ZK_REGION_CLOSED)
|
||||||
|
|| eventType.equals(EventType.RS_ZK_REGION_OPENED)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean setData = false;
|
||||||
|
try {
|
||||||
|
setData = ZKUtil.setData(zkw, node, data.getBytes(), version);
|
||||||
|
// Setdata throws KeeperException which aborts the Master. So we are
|
||||||
|
// catching it here.
|
||||||
|
// If just before setting the znode to OFFLINE if the RS has made any
|
||||||
|
// change to the
|
||||||
|
// znode state then we need to return -1.
|
||||||
|
} catch (KeeperException kpe) {
|
||||||
|
LOG.info("Version mismatch while setting the node to OFFLINE state.");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (!setData) {
|
||||||
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
// We successfully forced to OFFLINE, reset watch and handle if
|
// We successfully forced to OFFLINE, reset watch and handle if
|
||||||
// the state changed in between our set and the watch
|
// the state changed in between our set and the watch
|
||||||
|
@ -244,11 +313,11 @@ public class ZKAssign {
|
||||||
ZKAssign.getData(zkw, region.getEncodedName());
|
ZKAssign.getData(zkw, region.getEncodedName());
|
||||||
if (curData.getEventType() != data.getEventType()) {
|
if (curData.getEventType() != data.getEventType()) {
|
||||||
// state changed, need to process
|
// state changed, need to process
|
||||||
return false;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return stat.getVersion() + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -673,6 +742,19 @@ public class ZKAssign {
|
||||||
"the node existed but was version " + stat.getVersion() +
|
"the node existed but was version " + stat.getVersion() +
|
||||||
" not the expected version " + expectedVersion));
|
" not the expected version " + expectedVersion));
|
||||||
return -1;
|
return -1;
|
||||||
|
}// the below check ensure that double assignment doesnot happen.
|
||||||
|
// When the node is created for the first time then the expected version
|
||||||
|
// that is
|
||||||
|
// passed will be -1 and the version in znode will be 0.
|
||||||
|
// In all other cases the version in znode will be > 0.
|
||||||
|
else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE)
|
||||||
|
&& endState.equals(EventType.RS_ZK_REGION_OPENING)
|
||||||
|
&& expectedVersion == -1 && stat.getVersion() != 0) {
|
||||||
|
LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for "
|
||||||
|
+ encoded + " from " + beginState + " to " + endState + " failed, "
|
||||||
|
+ "the node existed but was version " + stat.getVersion()
|
||||||
|
+ " not the expected version " + expectedVersion));
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify it is in expected state
|
// Verify it is in expected state
|
||||||
|
|
Loading…
Reference in New Issue