HBASE-4105 HBASE-4015-Making the timeout monitor less racy; third attempt

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1166857 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-09-08 18:50:26 +00:00
parent 3aca4bab24
commit 18e9b21862
11 changed files with 511 additions and 179 deletions

View File

@ -333,7 +333,7 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
* @param region
* region to open
* @return RegionOpeningState
* OPENED - if region opened succesfully.
* OPENED - if region open request was successful.
* ALREADY_OPENED - if the region was already opened.
* FAILED_OPENING - if region opening failed.
*
@ -341,6 +341,22 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
*/
public RegionOpeningState openRegion(final HRegionInfo region) throws IOException;
/**
* Opens the specified region.
* @param region
* region to open
* @param versionOfOfflineNode
* the version of znode to compare when RS transitions the znode from
* OFFLINE state.
* @return RegionOpeningState
* OPENED - if region open request was successful.
* ALREADY_OPENED - if the region was already opened.
* FAILED_OPENING - if region opening failed.
* @throws IOException
*/
public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
throws IOException;
/**
* Opens the specified regions.
* @param regions regions to open

View File

@ -0,0 +1,47 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.HRegionInfo;
/**
* A callable object that invokes the corresponding action that needs to be
* taken for assignment of a region in transition.
* Implementing as future callable we are able to act on the timeout
* asynchronously.
*/
public class AssignCallable implements Callable<Object> {
private AssignmentManager assignmentManager;
private HRegionInfo hri;
public AssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
this.assignmentManager = assignmentManager;
this.hri = hri;
}
@Override
public Object call() throws Exception {
assignmentManager.assign(hri, true, true, true);
return null;
}
}

View File

@ -38,6 +38,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -159,6 +160,9 @@ public class AssignmentManager extends ZooKeeperListener {
private final ExecutorService executorService;
//Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService;
/**
* Constructs a new assignment manager.
*
@ -190,6 +194,7 @@ public class AssignmentManager extends ZooKeeperListener {
this.maximumAssignmentAttempts =
this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.threadPoolExecutorService = Executors.newCachedThreadPool();
}
/**
@ -475,9 +480,20 @@ public class AssignmentManager extends ZooKeeperListener {
// Just insert region into RIT
// If this never updates the timeout will trigger new assignment
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPENING,
data.getStamp(), data.getOrigin()));
if (regionInfo.isMetaRegion() || regionInfo.isRootRegion()) {
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPENING, data.getStamp(), data
.getOrigin()));
// If ROOT or .META. table is waiting for timeout monitor to assign
// it may take lot of time when the assignment.timeout.period is
// the default value which may be very long. We will not be able
// to serve any request during this time.
// So we will assign the ROOT and .META. region immediately.
processOpeningState(regionInfo);
break;
}
regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
break;
case RS_ZK_REGION_OPENED:
@ -1109,12 +1125,21 @@ public class AssignmentManager extends ZooKeeperListener {
public void assign(HRegionInfo region, boolean setOfflineInZK,
boolean forceNewPlan) {
String tableName = region.getTableNameAsString();
boolean disabled = this.zkTable.isDisabledTable(tableName);
if (disabled || this.zkTable.isDisablingTable(tableName)) {
LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") +
" skipping assign of " + region.getRegionNameAsString());
offlineDisabledRegion(region);
assign(region, setOfflineInZK, forceNewPlan, false);
}
/**
* @param region
* @param setOfflineInZK
* @param forceNewPlan
* @param hijack
* - true new assignment is needed, false otherwise
*/
public void assign(HRegionInfo region, boolean setOfflineInZK,
boolean forceNewPlan, boolean hijack) {
//If hijack is true do not call disableRegionIfInRIT as
// we have not yet moved the znode to OFFLINE state.
if (!hijack && isDisabledorDisablingRegionInRIT(region)) {
return;
}
if (this.serverManager.isClusterShutdown()) {
@ -1122,9 +1147,10 @@ public class AssignmentManager extends ZooKeeperListener {
region.getRegionNameAsString());
return;
}
RegionState state = addToRegionsInTransition(region);
RegionState state = addToRegionsInTransition(region,
hijack);
synchronized (state) {
assign(state, setOfflineInZK, forceNewPlan);
assign(region, state, setOfflineInZK, forceNewPlan, hijack);
}
}
@ -1282,11 +1308,19 @@ public class AssignmentManager extends ZooKeeperListener {
* @return The current RegionState
*/
private RegionState addToRegionsInTransition(final HRegionInfo region) {
return addToRegionsInTransition(region, false);
}
/**
* @param region
* @param hijack
* @return The current RegionState
*/
private RegionState addToRegionsInTransition(final HRegionInfo region,
boolean hijack) {
synchronized (regionsInTransition) {
return forceRegionStateToOffline(region);
return forceRegionStateToOffline(region, hijack);
}
}
/**
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
* Caller must hold lock on this.regionsInTransition.
@ -1294,14 +1328,32 @@ public class AssignmentManager extends ZooKeeperListener {
* @return Amended RegionState.
*/
private RegionState forceRegionStateToOffline(final HRegionInfo region) {
return forceRegionStateToOffline(region, false);
}
/**
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
* Caller must hold lock on this.regionsInTransition.
* @param region
* @param hijack
* @return Amended RegionState.
*/
private RegionState forceRegionStateToOffline(final HRegionInfo region,
boolean hijack) {
String encodedName = region.getEncodedName();
RegionState state = this.regionsInTransition.get(encodedName);
if (state == null) {
state = new RegionState(region, RegionState.State.OFFLINE);
this.regionsInTransition.put(encodedName, state);
} else {
LOG.debug("Forcing OFFLINE; was=" + state);
state.update(RegionState.State.OFFLINE);
// If we are reassigning the node do not force in-memory state to OFFLINE.
// Based on the znode state we will decide if to change
// in-memory state to OFFLINE or not. It will
// be done before setting the znode to OFFLINE state.
if (!hijack) {
LOG.debug("Forcing OFFLINE; was=" + state);
state.update(RegionState.State.OFFLINE);
}
}
return state;
}
@ -1311,11 +1363,29 @@ public class AssignmentManager extends ZooKeeperListener {
* @param state
* @param setOfflineInZK
* @param forceNewPlan
* @param hijack
*/
private void assign(final RegionState state, final boolean setOfflineInZK,
final boolean forceNewPlan) {
private void assign(final HRegionInfo region, final RegionState state,
final boolean setOfflineInZK, final boolean forceNewPlan,
boolean hijack) {
for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
if (setOfflineInZK && !setOfflineInZooKeeper(state)) return;
int versionOfOfflineNode = -1;
if (setOfflineInZK) {
// get the version of the znode after setting it to OFFLINE.
// versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
versionOfOfflineNode = setOfflineInZooKeeper(state,
hijack);
if(versionOfOfflineNode != -1){
if (isDisabledorDisablingRegionInRIT(region)) {
return;
}
}
}
if (setOfflineInZK && versionOfOfflineNode == -1) {
return;
}
if (this.master.isStopped()) {
LOG.debug("Server stopped; skipping assign of " + state);
return;
@ -1334,8 +1404,9 @@ public class AssignmentManager extends ZooKeeperListener {
state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
plan.getDestination());
// Send OPEN RPC. This can fail if the server on other end is is not up.
// Pass the version that was obtained while setting the node to OFFLINE.
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
.getDestination(), state.getRegion());
.getDestination(), state.getRegion(), versionOfOfflineNode);
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
// Remove region from in-memory transition and unassigned node from ZK
// While trying to enable the table the regions of the table were
@ -1389,31 +1460,69 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
String tableName = region.getTableNameAsString();
boolean disabled = this.zkTable.isDisabledTable(tableName);
if (disabled || this.zkTable.isDisablingTable(tableName)) {
LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
" skipping assign of " + region.getRegionNameAsString());
offlineDisabledRegion(region);
return true;
}
return false;
}
/**
* Set region as OFFLINED up in zookeeper
*
* @param state
* @return True if we succeeded, false otherwise (State was incorrect or failed
* updating zk).
* @param hijack
* - true if needs to be hijacked and reassigned, false otherwise.
* @return the version of the offline node if setting of the OFFLINE node was
* successful, -1 otherwise.
*/
boolean setOfflineInZooKeeper(final RegionState state) {
if (!state.isClosed() && !state.isOffline()) {
int setOfflineInZooKeeper(final RegionState state,
boolean hijack) {
// In case of reassignment the current state in memory need not be
// OFFLINE.
if (!hijack && !state.isClosed() && !state.isOffline()) {
this.master.abort("Unexpected state trying to OFFLINE; " + state,
new IllegalStateException());
return false;
new IllegalStateException());
return -1;
}
state.update(RegionState.State.OFFLINE);
boolean allowZNodeCreation = false;
// Under reassignment if the current state is PENDING_OPEN
// or OPENING then refresh the in-memory state to PENDING_OPEN. This is
// important because if the region was in
// RS_OPENING state for a long time the master will try to force the znode
// to OFFLINE state meanwhile the RS could have opened the corresponding
// region and the state in znode will be RS_ZK_REGION_OPENED.
// For all other cases we can change the in-memory state to OFFLINE.
if (hijack &&
(state.getState().equals(RegionState.State.PENDING_OPEN) ||
state.getState().equals(RegionState.State.OPENING))) {
state.update(RegionState.State.PENDING_OPEN);
allowZNodeCreation = false;
} else {
state.update(RegionState.State.OFFLINE);
allowZNodeCreation = true;
}
int versionOfOfflineNode = -1;
try {
if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
state.getRegion(), this.master.getServerName())) {
LOG.warn("Attempted to create/force node into OFFLINE state before " +
"completing assignment but failed to do so for " + state);
return false;
// get the version after setting the znode to OFFLINE
versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
state.getRegion(), this.master.getServerName(),
hijack, allowZNodeCreation);
if (versionOfOfflineNode == -1) {
LOG.warn("Attempted to create/force node into OFFLINE state before "
+ "completing assignment but failed to do so for " + state);
return -1;
}
} catch (KeeperException e) {
master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
return false;
return -1;
}
return true;
return versionOfOfflineNode;
}
/**
@ -2279,134 +2388,119 @@ public class AssignmentManager extends ZooKeeperListener {
protected void chore() {
// If bulkAssign in progress, suspend checks
if (this.bulkAssign) return;
List<HRegionInfo> unassigns = new ArrayList<HRegionInfo>();
Map<HRegionInfo, Boolean> assigns =
new HashMap<HRegionInfo, Boolean>();
synchronized (regionsInTransition) {
// Iterate all regions in transition checking for time outs
long now = System.currentTimeMillis();
for (RegionState regionState : regionsInTransition.values()) {
if (regionState.getStamp() + timeout <= now) {
HRegionInfo regionInfo = regionState.getRegion();
LOG.info("Regions in transition timed out: " + regionState);
// Expired! Do a retry.
switch (regionState.getState()) {
case CLOSED:
LOG.info("Region " + regionInfo.getEncodedName() +
" has been CLOSED for too long, waiting on queued " +
"ClosedRegionHandler to run or server shutdown");
// Update our timestamp.
regionState.updateTimestampToNow();
break;
case OFFLINE:
LOG.info("Region has been OFFLINE for too long, " +
"reassigning " + regionInfo.getRegionNameAsString() +
" to a random server");
assigns.put(regionState.getRegion(), Boolean.FALSE);
break;
case PENDING_OPEN:
LOG.info("Region has been PENDING_OPEN for too " +
"long, reassigning region=" +
regionInfo.getRegionNameAsString());
assigns.put(regionState.getRegion(), Boolean.TRUE);
break;
case OPENING:
LOG.info("Region has been OPENING for too " +
"long, reassigning region=" +
regionInfo.getRegionNameAsString());
// Should have a ZK node in OPENING state
try {
String node = ZKAssign.getNodeName(watcher,
regionInfo.getEncodedName());
Stat stat = new Stat();
RegionTransitionData data = ZKAssign.getDataNoWatch(watcher,
node, stat);
if (data == null) {
LOG.warn("Data is null, node " + node + " no longer exists");
break;
}
if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) {
LOG.debug("Region has transitioned to OPENED, allowing " +
"watched event handlers to process");
break;
} else if (data.getEventType() !=
EventType.RS_ZK_REGION_OPENING) {
LOG.warn("While timing out a region in state OPENING, " +
"found ZK node in unexpected state: " +
data.getEventType());
break;
}
// Attempt to transition node into OFFLINE
try {
data = new RegionTransitionData(
EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(),
master.getServerName());
if (ZKUtil.setData(watcher, node, data.getBytes(),
stat.getVersion())) {
// Node is now OFFLINE, let's trigger another assignment
ZKUtil.getDataAndWatch(watcher, node); // re-set the watch
LOG.info("Successfully transitioned region=" +
regionInfo.getRegionNameAsString() + " into OFFLINE" +
" and forcing a new assignment");
assigns.put(regionState.getRegion(), Boolean.TRUE);
}
} catch (KeeperException.NoNodeException nne) {
// Node did not exist, can't time this out
}
} catch (KeeperException ke) {
LOG.error("Unexpected ZK exception timing out CLOSING region",
ke);
break;
}
break;
case OPEN:
LOG.error("Region has been OPEN for too long, " +
"we don't know where region was opened so can't do anything");
synchronized(regionState) {
regionState.updateTimestampToNow();
}
break;
case PENDING_CLOSE:
LOG.info("Region has been PENDING_CLOSE for too " +
"long, running forced unassign again on region=" +
regionInfo.getRegionNameAsString());
try {
// If the server got the RPC, it will transition the node
// to CLOSING, so only do something here if no node exists
if (!ZKUtil.watchAndCheckExists(watcher,
ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
// Queue running of an unassign -- do actual unassign
// outside of the regionsInTransition lock.
unassigns.add(regionInfo);
}
} catch (NoNodeException e) {
LOG.debug("Node no longer existed so not forcing another " +
"unassignment");
} catch (KeeperException e) {
LOG.warn("Unexpected ZK exception timing out a region " +
"close", e);
}
break;
case CLOSING:
LOG.info("Region has been CLOSING for too " +
"long, this should eventually complete or the server will " +
"expire, doing nothing");
break;
}
//decide on action upon timeout
actOnTimeOut(regionState);
}
}
}
// Finish the work for regions in PENDING_CLOSE state
for (HRegionInfo hri: unassigns) {
unassign(hri, true);
}
for (Map.Entry<HRegionInfo, Boolean> e: assigns.entrySet()){
assign(e.getKey(), false, e.getValue());
}
private void actOnTimeOut(RegionState regionState) {
HRegionInfo regionInfo = regionState.getRegion();
LOG.info("Regions in transition timed out: " + regionState);
// Expired! Do a retry.
switch (regionState.getState()) {
case CLOSED:
LOG.info("Region " + regionInfo.getEncodedName()
+ " has been CLOSED for too long, waiting on queued "
+ "ClosedRegionHandler to run or server shutdown");
// Update our timestamp.
regionState.updateTimestampToNow();
break;
case OFFLINE:
LOG.info("Region has been OFFLINE for too long, " + "reassigning "
+ regionInfo.getRegionNameAsString() + " to a random server");
invokeAssign(regionInfo);
break;
case PENDING_OPEN:
LOG.info("Region has been PENDING_OPEN for too "
+ "long, reassigning region=" + regionInfo.getRegionNameAsString());
invokeAssign(regionInfo);
break;
case OPENING:
processOpeningState(regionInfo);
break;
case OPEN:
LOG.error("Region has been OPEN for too long, " +
"we don't know where region was opened so can't do anything");
synchronized (regionState) {
regionState.updateTimestampToNow();
}
break;
case PENDING_CLOSE:
LOG.info("Region has been PENDING_CLOSE for too "
+ "long, running forced unassign again on region="
+ regionInfo.getRegionNameAsString());
try {
// If the server got the RPC, it will transition the node
// to CLOSING, so only do something here if no node exists
if (!ZKUtil.watchAndCheckExists(watcher,
ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
// Queue running of an unassign -- do actual unassign
// outside of the regionsInTransition lock.
invokeUnassign(regionInfo);
}
} catch (NoNodeException e) {
LOG.debug("Node no longer existed so not forcing another "
+ "unassignment");
} catch (KeeperException e) {
LOG.warn("Unexpected ZK exception timing out a region close", e);
}
break;
case CLOSING:
LOG.info("Region has been CLOSING for too " +
"long, this should eventually complete or the server will " +
"expire, doing nothing");
break;
}
}
}
private void processOpeningState(HRegionInfo regionInfo) {
LOG.info("Region has been OPENING for too " + "long, reassigning region="
+ regionInfo.getRegionNameAsString());
// Should have a ZK node in OPENING state
try {
String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
Stat stat = new Stat();
RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node,
stat);
if (dataInZNode == null) {
LOG.warn("Data is null, node " + node + " no longer exists");
return;
}
if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) {
LOG.debug("Region has transitioned to OPENED, allowing "
+ "watched event handlers to process");
return;
} else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING) {
LOG.warn("While timing out a region in state OPENING, "
+ "found ZK node in unexpected state: "
+ dataInZNode.getEventType());
return;
}
invokeAssign(regionInfo);
} catch (KeeperException ke) {
LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
return;
}
return;
}
private void invokeAssign(HRegionInfo regionInfo) {
threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
}
private void invokeUnassign(HRegionInfo regionInfo) {
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
}
/**
* Process shutdown server removing any assignments.
* @param sn Server that went down.
@ -2697,4 +2791,12 @@ public class AssignmentManager extends ZooKeeperListener {
public boolean isServerOnline(ServerName serverName) {
return this.serverManager.isServerOnline(serverName);
}
/**
* Shutdown the threadpool executor service
*/
public void shutdown() {
if (null != threadPoolExecutorService) {
this.threadPoolExecutorService.shutdown();
}
}
}

View File

@ -1265,6 +1265,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
LOG.error("Error call master coprocessor preShutdown()", ioe);
}
}
this.assignmentManager.shutdown();
this.serverManager.shutdownCluster();
try {
this.clusterStatusTracker.setClusterDown();

View File

@ -392,8 +392,11 @@ public class ServerManager {
* <p>
* @param server server to open a region
* @param region region to open
* @param versionOfOfflineNode that needs to be present in the offline node
* when RS tries to change the state from OFFLINE to other states.
*/
public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region)
public RegionOpeningState sendRegionOpen(final ServerName server,
HRegionInfo region, int versionOfOfflineNode)
throws IOException {
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
@ -401,7 +404,8 @@ public class ServerManager {
" failed because no RPC connection found to this server");
return RegionOpeningState.FAILED_OPENING;
}
return hri.openRegion(region);
return (versionOfOfflineNode == -1) ? hri.openRegion(region) : hri
.openRegion(region, versionOfOfflineNode);
}
/**

View File

@ -0,0 +1,46 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.HRegionInfo;
/**
* A callable object that invokes the corresponding action that needs to be
* taken for unassignment of a region in transition. Implementing as future
* callable we are able to act on the timeout asynchronously.
*/
public class UnAssignCallable implements Callable<Object> {
private AssignmentManager assignmentManager;
private HRegionInfo hri;
public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
this.assignmentManager = assignmentManager;
this.hri = hri;
}
@Override
public Object call() throws Exception {
assignmentManager.unassign(hri);
return null;
}
}

View File

@ -2336,6 +2336,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
@QosPriority(priority=HIGH_QOS)
public RegionOpeningState openRegion(HRegionInfo region)
throws IOException {
return openRegion(region, -1);
}
@Override
@QosPriority(priority = HIGH_QOS)
public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
throws IOException {
checkOpen();
if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
throw new RegionAlreadyInTransitionException("open", region.getEncodedName());
@ -2350,12 +2356,16 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
region.getRegionNameAsString());
this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes());
HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
// Need to pass the expected version in the constructor.
if (region.isRootRegion()) {
this.service.submit(new OpenRootHandler(this, this, region, htd));
} else if(region.isMetaRegion()) {
this.service.submit(new OpenMetaHandler(this, this, region, htd));
this.service.submit(new OpenRootHandler(this, this, region, htd,
versionOfOfflineNode));
} else if (region.isMetaRegion()) {
this.service.submit(new OpenMetaHandler(this, this, region, htd,
versionOfOfflineNode));
} else {
this.service.submit(new OpenRegionHandler(this, this, region, htd));
this.service.submit(new OpenRegionHandler(this, this, region, htd,
versionOfOfflineNode));
}
return RegionOpeningState.OPENED;
}
@ -3104,7 +3114,4 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
HLog wal = this.getWAL();
return wal.rollWriter(true);
}
}

View File

@ -33,6 +33,12 @@ public class OpenMetaHandler extends OpenRegionHandler {
public OpenMetaHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd) {
super(server,rsServices, regionInfo, htd, EventType.M_RS_OPEN_META);
this(server, rsServices, regionInfo, htd, -1);
}
public OpenMetaHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd, int versionOfOfflineNode) {
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META,
versionOfOfflineNode);
}
}

View File

@ -52,20 +52,30 @@ public class OpenRegionHandler extends EventHandler {
// the total open. We'll fail the open if someone hijacks our znode; we can
// tell this has happened if version is not as expected.
private volatile int version = -1;
//version of the offline node that was set by the master
private volatile int versionOfOfflineNode = -1;
public OpenRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
HTableDescriptor htd) {
this (server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION);
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1);
}
public OpenRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
HTableDescriptor htd, int versionOfOfflineNode) {
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
versionOfOfflineNode);
}
protected OpenRegionHandler(final Server server,
final RegionServerServices rsServices, final HRegionInfo regionInfo,
final HTableDescriptor htd, EventType eventType) {
final HTableDescriptor htd, EventType eventType,
final int versionOfOfflineNode) {
super(server, eventType);
this.rsServices = rsServices;
this.regionInfo = regionInfo;
this.htd = htd;
this.versionOfOfflineNode = versionOfOfflineNode;
}
public HRegionInfo getRegionInfo() {
@ -86,7 +96,8 @@ public class OpenRegionHandler extends EventHandler {
// If fails, just return. Someone stole the region from under us.
// Calling transitionZookeeperOfflineToOpening initalizes this.version.
if (!transitionZookeeperOfflineToOpening(encodedName)) {
if (!transitionZookeeperOfflineToOpening(encodedName,
versionOfOfflineNode)) {
LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
encodedName);
return;
@ -325,15 +336,18 @@ public class OpenRegionHandler extends EventHandler {
* Transition ZK node from OFFLINE to OPENING.
* @param encodedName Name of the znode file (Region encodedName is the znode
* name).
* @param versionOfOfflineNode - version Of OfflineNode that needs to be compared
* before changing the node's state from OFFLINE
* @return True if successful transition.
*/
boolean transitionZookeeperOfflineToOpening(final String encodedName) {
boolean transitionZookeeperOfflineToOpening(final String encodedName,
int versionOfOfflineNode) {
// TODO: should also handle transition from CLOSED?
try {
// Initialize the znode version.
this.version =
ZKAssign.transitionNodeOpening(server.getZooKeeper(),
regionInfo, server.getServerName());
this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo,
server.getServerName(), EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
} catch (KeeperException e) {
LOG.error("Error transition from OFFLINE to OPENING for region=" +
encodedName, e);

View File

@ -33,6 +33,12 @@ public class OpenRootHandler extends OpenRegionHandler {
public OpenRootHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd) {
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT);
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, -1);
}
public OpenRootHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd, int versionOfOfflineNode) {
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT,
versionOfOfflineNode);
}
}

View File

@ -203,7 +203,6 @@ public class ZKAssign {
ZKUtil.setData(zkw, node, data.getBytes());
}
/**
* Creates or force updates an unassigned node to the OFFLINE state for the
* specified region.
@ -219,36 +218,108 @@ public class ZKAssign {
* @param zkw zk reference
* @param region region to be created as offline
* @param serverName server event originates from
* @return the version of the znode created in OFFLINE state, -1 if
* unsuccessful.
* @throws KeeperException if unexpected zookeeper exception
* @throws KeeperException.NodeExistsException if node already exists
*/
public static boolean createOrForceNodeOffline(ZooKeeperWatcher zkw,
HRegionInfo region, ServerName serverName)
public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
HRegionInfo region, ServerName serverName) throws KeeperException {
return createOrForceNodeOffline(zkw, region, serverName, false, true);
}
/**
* Creates or force updates an unassigned node to the OFFLINE state for the
* specified region.
* <p>
* Attempts to create the node but if it exists will force it to transition to
* and OFFLINE state.
* <p>
* Sets a watcher on the unassigned region node if the method is successful.
*
* <p>
* This method should be used when assigning a region.
*
* @param zkw
* zk reference
* @param region
* region to be created as offline
* @param serverName
* server event originates from
* @param hijack
* - true if to be hijacked and reassigned, false otherwise
* @param allowCreation
* - true if the node has to be created newly, false otherwise
* @throws KeeperException
* if unexpected zookeeper exception
* @return the version of the znode created in OFFLINE state, -1 if
* unsuccessful.
* @throws KeeperException.NodeExistsException
* if node already exists
*/
public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
HRegionInfo region, ServerName serverName,
boolean hijack, boolean allowCreation)
throws KeeperException {
LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
region.getEncodedName() + " with OFFLINE state"));
RegionTransitionData data = new RegionTransitionData(
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
String node = getNodeName(zkw, region.getEncodedName());
Stat stat = new Stat();
zkw.sync(node);
int version = ZKUtil.checkExists(zkw, node);
if (version == -1) {
ZKUtil.createAndWatch(zkw, node, data.getBytes());
// While trying to transit a node to OFFLINE that was in previously in
// OPENING state but before it could transit to OFFLINE state if RS had
// opened the region then the Master deletes the assigned region znode.
// In that case the znode will not exist. So we should not
// create the znode again which will lead to double assignment.
if (hijack && !allowCreation) {
return -1;
}
return ZKUtil.createAndWatch(zkw, node, data.getBytes());
} else {
if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) {
return false;
RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region
.getEncodedName(), stat);
// Do not move the node to OFFLINE if znode is in any of the following
// state.
// Because these are already executed states.
if (hijack && null != curDataInZNode) {
EventType eventType = curDataInZNode.getEventType();
if (eventType.equals(EventType.RS_ZK_REGION_CLOSING)
|| eventType.equals(EventType.RS_ZK_REGION_CLOSED)
|| eventType.equals(EventType.RS_ZK_REGION_OPENED)) {
return -1;
}
}
boolean setData = false;
try {
setData = ZKUtil.setData(zkw, node, data.getBytes(), version);
// Setdata throws KeeperException which aborts the Master. So we are
// catching it here.
// If just before setting the znode to OFFLINE if the RS has made any
// change to the
// znode state then we need to return -1.
} catch (KeeperException kpe) {
LOG.info("Version mismatch while setting the node to OFFLINE state.");
return -1;
}
if (!setData) {
return -1;
} else {
// We successfully forced to OFFLINE, reset watch and handle if
// the state changed in between our set and the watch
RegionTransitionData curData =
ZKAssign.getData(zkw, region.getEncodedName());
ZKAssign.getData(zkw, region.getEncodedName());
if (curData.getEventType() != data.getEventType()) {
// state changed, need to process
return false;
return -1;
}
}
}
return true;
return stat.getVersion() + 1;
}
/**
@ -673,6 +744,18 @@ public class ZKAssign {
"the node existed but was version " + stat.getVersion() +
" not the expected version " + expectedVersion));
return -1;
} else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE)
&& endState.equals(EventType.RS_ZK_REGION_OPENING)
&& expectedVersion == -1 && stat.getVersion() != 0) {
// the below check ensures that double assignment doesnot happen.
// When the node is created for the first time then the expected version
// that is passed will be -1 and the version in znode will be 0.
// In all other cases the version in znode will be > 0.
LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for "
+ encoded + " from " + beginState + " to " + endState + " failed, "
+ "the node existed but was version " + stat.getVersion()
+ " not the expected version " + expectedVersion));
return -1;
}
// Verify it is in expected state