diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 04980b5f344..3679c026404 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -333,7 +333,7 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl * @param region * region to open * @return RegionOpeningState - * OPENED - if region opened succesfully. + * OPENED - if region open request was successful. * ALREADY_OPENED - if the region was already opened. * FAILED_OPENING - if region opening failed. * @@ -341,6 +341,22 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl */ public RegionOpeningState openRegion(final HRegionInfo region) throws IOException; + /** + * Opens the specified region. + * @param region + * region to open + * @param versionOfOfflineNode + * the version of znode to compare when RS transitions the znode from + * OFFLINE state. + * @return RegionOpeningState + * OPENED - if region open request was successful. + * ALREADY_OPENED - if the region was already opened. + * FAILED_OPENING - if region opening failed. + * @throws IOException + */ + public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode) + throws IOException; + /** * Opens the specified regions. * @param regions regions to open diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java b/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java new file mode 100644 index 00000000000..b233d105bab --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java @@ -0,0 +1,47 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * A callable object that invokes the corresponding action that needs to be + * taken for assignment of a region in transition. + * Implementing as future callable we are able to act on the timeout + * asynchronously. + */ +public class AssignCallable implements Callable { + private AssignmentManager assignmentManager; + + private HRegionInfo hri; + + public AssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) { + this.assignmentManager = assignmentManager; + this.hri = hri; + } + + @Override + public Object call() throws Exception { + assignmentManager.assign(hri, true, true, true); + return null; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index c18f9138a84..c0170b4e233 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -38,6 +38,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -159,6 +160,9 @@ public class AssignmentManager extends ZooKeeperListener { private final ExecutorService executorService; + //Thread pool executor service for timeout monitor + private java.util.concurrent.ExecutorService threadPoolExecutorService; + /** * Constructs a new assignment manager. * @@ -190,6 +194,7 @@ public class AssignmentManager extends ZooKeeperListener { this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = LoadBalancerFactory.getLoadBalancer(conf); + this.threadPoolExecutorService = Executors.newCachedThreadPool(); } /** @@ -475,9 +480,20 @@ public class AssignmentManager extends ZooKeeperListener { // Just insert region into RIT // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPENING, - data.getStamp(), data.getOrigin())); + if (regionInfo.isMetaRegion() || regionInfo.isRootRegion()) { + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.OPENING, data.getStamp(), data + .getOrigin())); + // If ROOT or .META. table is waiting for timeout monitor to assign + // it may take lot of time when the assignment.timeout.period is + // the default value which may be very long. We will not be able + // to serve any request during this time. + // So we will assign the ROOT and .META. region immediately. + processOpeningState(regionInfo); + break; + } + regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, + RegionState.State.OPENING, data.getStamp(), data.getOrigin())); break; case RS_ZK_REGION_OPENED: @@ -1109,12 +1125,21 @@ public class AssignmentManager extends ZooKeeperListener { public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan) { - String tableName = region.getTableNameAsString(); - boolean disabled = this.zkTable.isDisabledTable(tableName); - if (disabled || this.zkTable.isDisablingTable(tableName)) { - LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") + - " skipping assign of " + region.getRegionNameAsString()); - offlineDisabledRegion(region); + assign(region, setOfflineInZK, forceNewPlan, false); + } + + /** + * @param region + * @param setOfflineInZK + * @param forceNewPlan + * @param hijack + * - true new assignment is needed, false otherwise + */ + public void assign(HRegionInfo region, boolean setOfflineInZK, + boolean forceNewPlan, boolean hijack) { + //If hijack is true do not call disableRegionIfInRIT as + // we have not yet moved the znode to OFFLINE state. + if (!hijack && isDisabledorDisablingRegionInRIT(region)) { return; } if (this.serverManager.isClusterShutdown()) { @@ -1122,9 +1147,10 @@ public class AssignmentManager extends ZooKeeperListener { region.getRegionNameAsString()); return; } - RegionState state = addToRegionsInTransition(region); + RegionState state = addToRegionsInTransition(region, + hijack); synchronized (state) { - assign(state, setOfflineInZK, forceNewPlan); + assign(region, state, setOfflineInZK, forceNewPlan, hijack); } } @@ -1282,11 +1308,19 @@ public class AssignmentManager extends ZooKeeperListener { * @return The current RegionState */ private RegionState addToRegionsInTransition(final HRegionInfo region) { + return addToRegionsInTransition(region, false); + } + /** + * @param region + * @param hijack + * @return The current RegionState + */ + private RegionState addToRegionsInTransition(final HRegionInfo region, + boolean hijack) { synchronized (regionsInTransition) { - return forceRegionStateToOffline(region); + return forceRegionStateToOffline(region, hijack); } } - /** * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. * Caller must hold lock on this.regionsInTransition. @@ -1294,14 +1328,32 @@ public class AssignmentManager extends ZooKeeperListener { * @return Amended RegionState. */ private RegionState forceRegionStateToOffline(final HRegionInfo region) { + return forceRegionStateToOffline(region, false); + } + + /** + * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. + * Caller must hold lock on this.regionsInTransition. + * @param region + * @param hijack + * @return Amended RegionState. + */ + private RegionState forceRegionStateToOffline(final HRegionInfo region, + boolean hijack) { String encodedName = region.getEncodedName(); RegionState state = this.regionsInTransition.get(encodedName); if (state == null) { state = new RegionState(region, RegionState.State.OFFLINE); this.regionsInTransition.put(encodedName, state); } else { - LOG.debug("Forcing OFFLINE; was=" + state); - state.update(RegionState.State.OFFLINE); + // If we are reassigning the node do not force in-memory state to OFFLINE. + // Based on the znode state we will decide if to change + // in-memory state to OFFLINE or not. It will + // be done before setting the znode to OFFLINE state. + if (!hijack) { + LOG.debug("Forcing OFFLINE; was=" + state); + state.update(RegionState.State.OFFLINE); + } } return state; } @@ -1311,11 +1363,29 @@ public class AssignmentManager extends ZooKeeperListener { * @param state * @param setOfflineInZK * @param forceNewPlan + * @param hijack */ - private void assign(final RegionState state, final boolean setOfflineInZK, - final boolean forceNewPlan) { + private void assign(final HRegionInfo region, final RegionState state, + final boolean setOfflineInZK, final boolean forceNewPlan, + boolean hijack) { for (int i = 0; i < this.maximumAssignmentAttempts; i++) { - if (setOfflineInZK && !setOfflineInZooKeeper(state)) return; + int versionOfOfflineNode = -1; + if (setOfflineInZK) { + // get the version of the znode after setting it to OFFLINE. + // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE + versionOfOfflineNode = setOfflineInZooKeeper(state, + hijack); + if(versionOfOfflineNode != -1){ + if (isDisabledorDisablingRegionInRIT(region)) { + return; + } + } + } + + if (setOfflineInZK && versionOfOfflineNode == -1) { + return; + } + if (this.master.isStopped()) { LOG.debug("Server stopped; skipping assign of " + state); return; @@ -1334,8 +1404,9 @@ public class AssignmentManager extends ZooKeeperListener { state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), plan.getDestination()); // Send OPEN RPC. This can fail if the server on other end is is not up. + // Pass the version that was obtained while setting the node to OFFLINE. RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan - .getDestination(), state.getRegion()); + .getDestination(), state.getRegion(), versionOfOfflineNode); if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { // Remove region from in-memory transition and unassigned node from ZK // While trying to enable the table the regions of the table were @@ -1389,31 +1460,69 @@ public class AssignmentManager extends ZooKeeperListener { } } + private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { + String tableName = region.getTableNameAsString(); + boolean disabled = this.zkTable.isDisabledTable(tableName); + if (disabled || this.zkTable.isDisablingTable(tableName)) { + LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") + + " skipping assign of " + region.getRegionNameAsString()); + offlineDisabledRegion(region); + return true; + } + return false; + } + /** * Set region as OFFLINED up in zookeeper + * * @param state - * @return True if we succeeded, false otherwise (State was incorrect or failed - * updating zk). + * @param hijack + * - true if needs to be hijacked and reassigned, false otherwise. + * @return the version of the offline node if setting of the OFFLINE node was + * successful, -1 otherwise. */ - boolean setOfflineInZooKeeper(final RegionState state) { - if (!state.isClosed() && !state.isOffline()) { + int setOfflineInZooKeeper(final RegionState state, + boolean hijack) { + // In case of reassignment the current state in memory need not be + // OFFLINE. + if (!hijack && !state.isClosed() && !state.isOffline()) { this.master.abort("Unexpected state trying to OFFLINE; " + state, - new IllegalStateException()); - return false; + new IllegalStateException()); + return -1; } - state.update(RegionState.State.OFFLINE); + boolean allowZNodeCreation = false; + // Under reassignment if the current state is PENDING_OPEN + // or OPENING then refresh the in-memory state to PENDING_OPEN. This is + // important because if the region was in + // RS_OPENING state for a long time the master will try to force the znode + // to OFFLINE state meanwhile the RS could have opened the corresponding + // region and the state in znode will be RS_ZK_REGION_OPENED. + // For all other cases we can change the in-memory state to OFFLINE. + if (hijack && + (state.getState().equals(RegionState.State.PENDING_OPEN) || + state.getState().equals(RegionState.State.OPENING))) { + state.update(RegionState.State.PENDING_OPEN); + allowZNodeCreation = false; + } else { + state.update(RegionState.State.OFFLINE); + allowZNodeCreation = true; + } + int versionOfOfflineNode = -1; try { - if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), - state.getRegion(), this.master.getServerName())) { - LOG.warn("Attempted to create/force node into OFFLINE state before " + - "completing assignment but failed to do so for " + state); - return false; + // get the version after setting the znode to OFFLINE + versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), + state.getRegion(), this.master.getServerName(), + hijack, allowZNodeCreation); + if (versionOfOfflineNode == -1) { + LOG.warn("Attempted to create/force node into OFFLINE state before " + + "completing assignment but failed to do so for " + state); + return -1; } } catch (KeeperException e) { master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); - return false; + return -1; } - return true; + return versionOfOfflineNode; } /** @@ -2279,134 +2388,119 @@ public class AssignmentManager extends ZooKeeperListener { protected void chore() { // If bulkAssign in progress, suspend checks if (this.bulkAssign) return; - List unassigns = new ArrayList(); - Map assigns = - new HashMap(); synchronized (regionsInTransition) { // Iterate all regions in transition checking for time outs long now = System.currentTimeMillis(); for (RegionState regionState : regionsInTransition.values()) { if (regionState.getStamp() + timeout <= now) { - HRegionInfo regionInfo = regionState.getRegion(); - LOG.info("Regions in transition timed out: " + regionState); - // Expired! Do a retry. - switch (regionState.getState()) { - case CLOSED: - LOG.info("Region " + regionInfo.getEncodedName() + - " has been CLOSED for too long, waiting on queued " + - "ClosedRegionHandler to run or server shutdown"); - // Update our timestamp. - regionState.updateTimestampToNow(); - break; - case OFFLINE: - LOG.info("Region has been OFFLINE for too long, " + - "reassigning " + regionInfo.getRegionNameAsString() + - " to a random server"); - assigns.put(regionState.getRegion(), Boolean.FALSE); - break; - case PENDING_OPEN: - LOG.info("Region has been PENDING_OPEN for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - assigns.put(regionState.getRegion(), Boolean.TRUE); - break; - case OPENING: - LOG.info("Region has been OPENING for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - // Should have a ZK node in OPENING state - try { - String node = ZKAssign.getNodeName(watcher, - regionInfo.getEncodedName()); - Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, - node, stat); - if (data == null) { - LOG.warn("Data is null, node " + node + " no longer exists"); - break; - } - if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { - LOG.debug("Region has transitioned to OPENED, allowing " + - "watched event handlers to process"); - break; - } else if (data.getEventType() != - EventType.RS_ZK_REGION_OPENING) { - LOG.warn("While timing out a region in state OPENING, " + - "found ZK node in unexpected state: " + - data.getEventType()); - break; - } - // Attempt to transition node into OFFLINE - try { - data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(), - master.getServerName()); - if (ZKUtil.setData(watcher, node, data.getBytes(), - stat.getVersion())) { - // Node is now OFFLINE, let's trigger another assignment - ZKUtil.getDataAndWatch(watcher, node); // re-set the watch - LOG.info("Successfully transitioned region=" + - regionInfo.getRegionNameAsString() + " into OFFLINE" + - " and forcing a new assignment"); - assigns.put(regionState.getRegion(), Boolean.TRUE); - } - } catch (KeeperException.NoNodeException nne) { - // Node did not exist, can't time this out - } - } catch (KeeperException ke) { - LOG.error("Unexpected ZK exception timing out CLOSING region", - ke); - break; - } - break; - case OPEN: - LOG.error("Region has been OPEN for too long, " + - "we don't know where region was opened so can't do anything"); - synchronized(regionState) { - regionState.updateTimestampToNow(); - } - break; - - case PENDING_CLOSE: - LOG.info("Region has been PENDING_CLOSE for too " + - "long, running forced unassign again on region=" + - regionInfo.getRegionNameAsString()); - try { - // If the server got the RPC, it will transition the node - // to CLOSING, so only do something here if no node exists - if (!ZKUtil.watchAndCheckExists(watcher, - ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { - // Queue running of an unassign -- do actual unassign - // outside of the regionsInTransition lock. - unassigns.add(regionInfo); - } - } catch (NoNodeException e) { - LOG.debug("Node no longer existed so not forcing another " + - "unassignment"); - } catch (KeeperException e) { - LOG.warn("Unexpected ZK exception timing out a region " + - "close", e); - } - break; - case CLOSING: - LOG.info("Region has been CLOSING for too " + - "long, this should eventually complete or the server will " + - "expire, doing nothing"); - break; - } + //decide on action upon timeout + actOnTimeOut(regionState); } } } - // Finish the work for regions in PENDING_CLOSE state - for (HRegionInfo hri: unassigns) { - unassign(hri, true); - } - for (Map.Entry e: assigns.entrySet()){ - assign(e.getKey(), false, e.getValue()); + } + + private void actOnTimeOut(RegionState regionState) { + HRegionInfo regionInfo = regionState.getRegion(); + LOG.info("Regions in transition timed out: " + regionState); + // Expired! Do a retry. + switch (regionState.getState()) { + case CLOSED: + LOG.info("Region " + regionInfo.getEncodedName() + + " has been CLOSED for too long, waiting on queued " + + "ClosedRegionHandler to run or server shutdown"); + // Update our timestamp. + regionState.updateTimestampToNow(); + break; + case OFFLINE: + LOG.info("Region has been OFFLINE for too long, " + "reassigning " + + regionInfo.getRegionNameAsString() + " to a random server"); + invokeAssign(regionInfo); + break; + case PENDING_OPEN: + LOG.info("Region has been PENDING_OPEN for too " + + "long, reassigning region=" + regionInfo.getRegionNameAsString()); + invokeAssign(regionInfo); + break; + case OPENING: + processOpeningState(regionInfo); + break; + case OPEN: + LOG.error("Region has been OPEN for too long, " + + "we don't know where region was opened so can't do anything"); + synchronized (regionState) { + regionState.updateTimestampToNow(); + } + break; + + case PENDING_CLOSE: + LOG.info("Region has been PENDING_CLOSE for too " + + "long, running forced unassign again on region=" + + regionInfo.getRegionNameAsString()); + try { + // If the server got the RPC, it will transition the node + // to CLOSING, so only do something here if no node exists + if (!ZKUtil.watchAndCheckExists(watcher, + ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { + // Queue running of an unassign -- do actual unassign + // outside of the regionsInTransition lock. + invokeUnassign(regionInfo); + } + } catch (NoNodeException e) { + LOG.debug("Node no longer existed so not forcing another " + + "unassignment"); + } catch (KeeperException e) { + LOG.warn("Unexpected ZK exception timing out a region close", e); + } + break; + case CLOSING: + LOG.info("Region has been CLOSING for too " + + "long, this should eventually complete or the server will " + + "expire, doing nothing"); + break; } } } - + + private void processOpeningState(HRegionInfo regionInfo) { + LOG.info("Region has been OPENING for too " + "long, reassigning region=" + + regionInfo.getRegionNameAsString()); + // Should have a ZK node in OPENING state + try { + String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()); + Stat stat = new Stat(); + RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node, + stat); + if (dataInZNode == null) { + LOG.warn("Data is null, node " + node + " no longer exists"); + return; + } + if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) { + LOG.debug("Region has transitioned to OPENED, allowing " + + "watched event handlers to process"); + return; + } else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING) { + LOG.warn("While timing out a region in state OPENING, " + + "found ZK node in unexpected state: " + + dataInZNode.getEventType()); + return; + } + invokeAssign(regionInfo); + } catch (KeeperException ke) { + LOG.error("Unexpected ZK exception timing out CLOSING region", ke); + return; + } + return; + } + + private void invokeAssign(HRegionInfo regionInfo) { + threadPoolExecutorService.submit(new AssignCallable(this, regionInfo)); + } + + private void invokeUnassign(HRegionInfo regionInfo) { + threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo)); + } + /** * Process shutdown server removing any assignments. * @param sn Server that went down. @@ -2697,4 +2791,12 @@ public class AssignmentManager extends ZooKeeperListener { public boolean isServerOnline(ServerName serverName) { return this.serverManager.isServerOnline(serverName); } + /** + * Shutdown the threadpool executor service + */ + public void shutdown() { + if (null != threadPoolExecutorService) { + this.threadPoolExecutorService.shutdown(); + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ba720526be3..9de55813b18 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -188,7 +188,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private final ServerName serverName; private TableDescriptors tableDescriptors; - + /** * Initializes the HMaster. The steps are as follows: *

@@ -1265,6 +1265,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { LOG.error("Error call master coprocessor preShutdown()", ioe); } } + this.assignmentManager.shutdown(); this.serverManager.shutdownCluster(); try { this.clusterStatusTracker.setClusterDown(); diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 57c1140dd39..f994f99f817 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -392,8 +392,11 @@ public class ServerManager { *

* @param server server to open a region * @param region region to open + * @param versionOfOfflineNode that needs to be present in the offline node + * when RS tries to change the state from OFFLINE to other states. */ - public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region) + public RegionOpeningState sendRegionOpen(final ServerName server, + HRegionInfo region, int versionOfOfflineNode) throws IOException { HRegionInterface hri = getServerConnection(server); if (hri == null) { @@ -401,7 +404,8 @@ public class ServerManager { " failed because no RPC connection found to this server"); return RegionOpeningState.FAILED_OPENING; } - return hri.openRegion(region); + return (versionOfOfflineNode == -1) ? hri.openRegion(region) : hri + .openRegion(region, versionOfOfflineNode); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java b/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java new file mode 100644 index 00000000000..f1d7797a766 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java @@ -0,0 +1,46 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * A callable object that invokes the corresponding action that needs to be + * taken for unassignment of a region in transition. Implementing as future + * callable we are able to act on the timeout asynchronously. + */ +public class UnAssignCallable implements Callable { + private AssignmentManager assignmentManager; + + private HRegionInfo hri; + + public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) { + this.assignmentManager = assignmentManager; + this.hri = hri; + } + + @Override + public Object call() throws Exception { + assignmentManager.unassign(hri); + return null; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 83523f3b93d..cd809baa76c 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2336,6 +2336,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, @QosPriority(priority=HIGH_QOS) public RegionOpeningState openRegion(HRegionInfo region) throws IOException { + return openRegion(region, -1); + } + @Override + @QosPriority(priority = HIGH_QOS) + public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode) + throws IOException { checkOpen(); if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { throw new RegionAlreadyInTransitionException("open", region.getEncodedName()); @@ -2350,12 +2356,16 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, region.getRegionNameAsString()); this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes()); HTableDescriptor htd = this.tableDescriptors.get(region.getTableName()); + // Need to pass the expected version in the constructor. if (region.isRootRegion()) { - this.service.submit(new OpenRootHandler(this, this, region, htd)); - } else if(region.isMetaRegion()) { - this.service.submit(new OpenMetaHandler(this, this, region, htd)); + this.service.submit(new OpenRootHandler(this, this, region, htd, + versionOfOfflineNode)); + } else if (region.isMetaRegion()) { + this.service.submit(new OpenMetaHandler(this, this, region, htd, + versionOfOfflineNode)); } else { - this.service.submit(new OpenRegionHandler(this, this, region, htd)); + this.service.submit(new OpenRegionHandler(this, this, region, htd, + versionOfOfflineNode)); } return RegionOpeningState.OPENED; } @@ -3104,7 +3114,4 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, HLog wal = this.getWAL(); return wal.rollWriter(true); } - - - } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java index 55f80c1c9ea..66e57069d15 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java @@ -33,6 +33,12 @@ public class OpenMetaHandler extends OpenRegionHandler { public OpenMetaHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, final HTableDescriptor htd) { - super(server,rsServices, regionInfo, htd, EventType.M_RS_OPEN_META); + this(server, rsServices, regionInfo, htd, -1); + } + public OpenMetaHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + final HTableDescriptor htd, int versionOfOfflineNode) { + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META, + versionOfOfflineNode); } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 66ecbca5174..8f96baaecdb 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -52,20 +52,30 @@ public class OpenRegionHandler extends EventHandler { // the total open. We'll fail the open if someone hijacks our znode; we can // tell this has happened if version is not as expected. private volatile int version = -1; + //version of the offline node that was set by the master + private volatile int versionOfOfflineNode = -1; public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, HTableDescriptor htd) { - this (server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION); + this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1); + } + public OpenRegionHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + HTableDescriptor htd, int versionOfOfflineNode) { + this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, + versionOfOfflineNode); } protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, - final HTableDescriptor htd, EventType eventType) { + final HTableDescriptor htd, EventType eventType, + final int versionOfOfflineNode) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; this.htd = htd; + this.versionOfOfflineNode = versionOfOfflineNode; } public HRegionInfo getRegionInfo() { @@ -86,7 +96,8 @@ public class OpenRegionHandler extends EventHandler { // If fails, just return. Someone stole the region from under us. // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName)) { + if (!transitionZookeeperOfflineToOpening(encodedName, + versionOfOfflineNode)) { LOG.warn("Region was hijacked? It no longer exists, encodedName=" + encodedName); return; @@ -325,15 +336,18 @@ public class OpenRegionHandler extends EventHandler { * Transition ZK node from OFFLINE to OPENING. * @param encodedName Name of the znode file (Region encodedName is the znode * name). + * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared + * before changing the node's state from OFFLINE * @return True if successful transition. */ - boolean transitionZookeeperOfflineToOpening(final String encodedName) { + boolean transitionZookeeperOfflineToOpening(final String encodedName, + int versionOfOfflineNode) { // TODO: should also handle transition from CLOSED? try { // Initialize the znode version. - this.version = - ZKAssign.transitionNodeOpening(server.getZooKeeper(), - regionInfo, server.getServerName()); + this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo, + server.getServerName(), EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); } catch (KeeperException e) { LOG.error("Error transition from OFFLINE to OPENING for region=" + encodedName, e); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java index 027d0776ff1..9a4f01a838f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java @@ -33,6 +33,12 @@ public class OpenRootHandler extends OpenRegionHandler { public OpenRootHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, final HTableDescriptor htd) { - super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT); + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, -1); + } + public OpenRootHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + final HTableDescriptor htd, int versionOfOfflineNode) { + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, + versionOfOfflineNode); } } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java index 8ea537d33af..79edbfd9b0c 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java @@ -203,7 +203,6 @@ public class ZKAssign { ZKUtil.setData(zkw, node, data.getBytes()); } - /** * Creates or force updates an unassigned node to the OFFLINE state for the * specified region. @@ -219,36 +218,108 @@ public class ZKAssign { * @param zkw zk reference * @param region region to be created as offline * @param serverName server event originates from + * @return the version of the znode created in OFFLINE state, -1 if + * unsuccessful. * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NodeExistsException if node already exists */ - public static boolean createOrForceNodeOffline(ZooKeeperWatcher zkw, - HRegionInfo region, ServerName serverName) + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, ServerName serverName) throws KeeperException { + return createOrForceNodeOffline(zkw, region, serverName, false, true); + } + + /** + * Creates or force updates an unassigned node to the OFFLINE state for the + * specified region. + *

+ * Attempts to create the node but if it exists will force it to transition to + * and OFFLINE state. + *

+ * Sets a watcher on the unassigned region node if the method is successful. + * + *

+ * This method should be used when assigning a region. + * + * @param zkw + * zk reference + * @param region + * region to be created as offline + * @param serverName + * server event originates from + * @param hijack + * - true if to be hijacked and reassigned, false otherwise + * @param allowCreation + * - true if the node has to be created newly, false otherwise + * @throws KeeperException + * if unexpected zookeeper exception + * @return the version of the znode created in OFFLINE state, -1 if + * unsuccessful. + * @throws KeeperException.NodeExistsException + * if node already exists + */ + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, ServerName serverName, + boolean hijack, boolean allowCreation) throws KeeperException { LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " + region.getEncodedName() + " with OFFLINE state")); RegionTransitionData data = new RegionTransitionData( EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); String node = getNodeName(zkw, region.getEncodedName()); + Stat stat = new Stat(); zkw.sync(node); int version = ZKUtil.checkExists(zkw, node); if (version == -1) { - ZKUtil.createAndWatch(zkw, node, data.getBytes()); + // While trying to transit a node to OFFLINE that was in previously in + // OPENING state but before it could transit to OFFLINE state if RS had + // opened the region then the Master deletes the assigned region znode. + // In that case the znode will not exist. So we should not + // create the znode again which will lead to double assignment. + if (hijack && !allowCreation) { + return -1; + } + return ZKUtil.createAndWatch(zkw, node, data.getBytes()); } else { - if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) { - return false; + RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region + .getEncodedName(), stat); + // Do not move the node to OFFLINE if znode is in any of the following + // state. + // Because these are already executed states. + if (hijack && null != curDataInZNode) { + EventType eventType = curDataInZNode.getEventType(); + if (eventType.equals(EventType.RS_ZK_REGION_CLOSING) + || eventType.equals(EventType.RS_ZK_REGION_CLOSED) + || eventType.equals(EventType.RS_ZK_REGION_OPENED)) { + return -1; + } + } + + boolean setData = false; + try { + setData = ZKUtil.setData(zkw, node, data.getBytes(), version); + // Setdata throws KeeperException which aborts the Master. So we are + // catching it here. + // If just before setting the znode to OFFLINE if the RS has made any + // change to the + // znode state then we need to return -1. + } catch (KeeperException kpe) { + LOG.info("Version mismatch while setting the node to OFFLINE state."); + return -1; + } + if (!setData) { + return -1; } else { // We successfully forced to OFFLINE, reset watch and handle if // the state changed in between our set and the watch RegionTransitionData curData = - ZKAssign.getData(zkw, region.getEncodedName()); + ZKAssign.getData(zkw, region.getEncodedName()); if (curData.getEventType() != data.getEventType()) { // state changed, need to process - return false; + return -1; } } } - return true; + return stat.getVersion() + 1; } /** @@ -673,6 +744,18 @@ public class ZKAssign { "the node existed but was version " + stat.getVersion() + " not the expected version " + expectedVersion)); return -1; + } else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE) + && endState.equals(EventType.RS_ZK_REGION_OPENING) + && expectedVersion == -1 && stat.getVersion() != 0) { + // the below check ensures that double assignment doesnot happen. + // When the node is created for the first time then the expected version + // that is passed will be -1 and the version in znode will be 0. + // In all other cases the version in znode will be > 0. + LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + + encoded + " from " + beginState + " to " + endState + " failed, " + + "the node existed but was version " + stat.getVersion() + + " not the expected version " + expectedVersion)); + return -1; } // Verify it is in expected state