HBASE-10962 Decouple region opening (HM and HRS) from ZK (Mikhail Antonov)

This commit is contained in:
Michael Stack 2014-06-06 07:46:00 -07:00
parent 6ce225b1d6
commit 623cfa33d1
14 changed files with 811 additions and 339 deletions

View File

@ -62,4 +62,9 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
* Method to retrieve coordination for closing region operations.
*/
public abstract CloseRegionCoordination getCloseRegionCoordination();
/**
* Method to retrieve coordination for opening region operations.
*/
public abstract OpenRegionCoordination getOpenRegionCoordination();
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coordination;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import java.io.IOException;
/**
* Cocoordination operations for opening regions.
*/
@InterfaceAudience.Private
public interface OpenRegionCoordination {
//---------------------
// RS-side operations
//---------------------
/**
* Tries to move regions to OPENED state.
*
* @param r Region we're working on.
* @param ord details about region opening task
* @return whether transition was successful or not
* @throws java.io.IOException
*/
boolean transitionToOpened(HRegion r, OpenRegionDetails ord) throws IOException;
/**
* Transitions region from offline to opening state.
* @param regionInfo region we're working on.
* @param ord details about opening task.
* @return true if successful, false otherwise
*/
boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
OpenRegionDetails ord);
/**
* Heartbeats to prevent timeouts.
*
* @param ord details about opening task.
* @param regionInfo region we're working on.
* @param rsServices instance of RegionServerrServices
* @param context used for logging purposes only
* @return true if successful heartbeat, false otherwise.
*/
boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo,
RegionServerServices rsServices, String context);
/**
* Tries transition region from offline to failed open.
* @param rsServices instance of RegionServerServices
* @param hri region we're working on
* @param ord details about region opening task
* @return true if successful, false otherwise
*/
boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
HRegionInfo hri, OpenRegionDetails ord);
/**
* Tries transition from Opening to Failed open.
* @param hri region we're working on
* @param ord details about region opening task
* @return true if successfu. false otherwise.
*/
boolean tryTransitionFromOpeningToFailedOpen(HRegionInfo hri, OpenRegionDetails ord);
/**
* Construct OpenRegionDetails instance from part of protobuf request.
* @return instance of OpenRegionDetails.
*/
OpenRegionDetails parseFromProtoRequest(AdminProtos.OpenRegionRequest.RegionOpenInfo
regionOpenInfo);
/**
* Get details object with params for case when we're opening on
* regionserver side with all "default" properties.
*/
OpenRegionDetails getDetailsForNonCoordinatedOpening();
//-------------------------
// HMaster-side operations
//-------------------------
/**
* Commits opening operation on HM side (steps required for "commit"
* are determined by coordination implementation).
* @return true if committed successfully, false otherwise.
*/
public boolean commitOpenOnMasterSide(AssignmentManager assignmentManager,
HRegionInfo regionInfo,
OpenRegionDetails ord);
/**
* Interface for region opening tasks. Used to carry implementation details in
* encapsulated way through Handlers to the coordination API.
*/
static interface OpenRegionDetails {
/**
* Sets server name on which opening operation is running.
*/
void setServerName(ServerName serverName);
/**
* @return server name on which opening op is running.
*/
ServerName getServerName();
}
}

View File

@ -37,6 +37,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
protected ZooKeeperWatcher watcher;
protected SplitTransactionCoordination splitTransactionCoordination;
protected CloseRegionCoordination closeRegionCoordination;
protected OpenRegionCoordination openRegionCoordination;
@Override
public void initialize(Server server) {
@ -45,6 +46,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
openRegionCoordination = new ZkOpenRegionCoordination(this, watcher);
}
@Override
@ -71,4 +73,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
public CloseRegionCoordination getCloseRegionCoordination() {
return closeRegionCoordination;
}
@Override
public OpenRegionCoordination getOpenRegionCoordination() {
return openRegionCoordination;
}
}

View File

@ -0,0 +1,414 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coordination;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/**
* ZK-based implementation of {@link OpenRegionCoordination}.
*/
@InterfaceAudience.Private
public class ZkOpenRegionCoordination implements OpenRegionCoordination {
private static final Log LOG = LogFactory.getLog(ZkOpenRegionCoordination.class);
private CoordinatedStateManager coordination;
private final ZooKeeperWatcher watcher;
public ZkOpenRegionCoordination(CoordinatedStateManager coordination,
ZooKeeperWatcher watcher) {
this.coordination = coordination;
this.watcher = watcher;
}
//-------------------------------
// Region Server-side operations
//-------------------------------
/**
* @param r Region we're working on.
* @return whether znode is successfully transitioned to OPENED state.
* @throws java.io.IOException
*/
@Override
public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord) throws IOException {
ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
boolean result = false;
HRegionInfo hri = r.getRegionInfo();
final String name = hri.getRegionNameAsString();
// Finally, Transition ZK node to OPENED
try {
if (ZKAssign.transitionNodeOpened(watcher, hri,
zkOrd.getServerName(), zkOrd.getVersion()) == -1) {
String warnMsg = "Completed the OPEN of region " + name +
" but when transitioning from " + " OPENING to OPENED ";
try {
String node = ZKAssign.getNodeName(watcher, hri.getEncodedName());
if (ZKUtil.checkExists(watcher, node) < 0) {
// if the znode
coordination.getServer().abort(warnMsg + "the znode disappeared", null);
} else {
LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " +
"so now unassigning -- closing region on server: " + zkOrd.getServerName());
}
} catch (KeeperException ke) {
coordination.getServer().abort(warnMsg, ke);
}
} else {
LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() +
" to OPENED in zk on " + zkOrd.getServerName());
result = true;
}
} catch (KeeperException e) {
LOG.error("Failed transitioning node " + name +
" from OPENING to OPENED -- closing region", e);
}
return result;
}
/**
* Transition ZK node from OFFLINE to OPENING.
* @param regionInfo region info instance
* @param ord - instance of open region details, for ZK implementation
* will include version Of OfflineNode that needs to be compared
* before changing the node's state from OFFLINE
* @return True if successful transition.
*/
@Override
public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
OpenRegionDetails ord) {
ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
// encoded name is used as znode encoded name in ZK
final String encodedName = regionInfo.getEncodedName();
// TODO: should also handle transition from CLOSED?
try {
// Initialize the znode version.
zkOrd.setVersion(ZKAssign.transitionNode(watcher, regionInfo,
zkOrd.getServerName(), EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, zkOrd.getVersionOfOfflineNode()));
} catch (KeeperException e) {
LOG.error("Error transition from OFFLINE to OPENING for region=" +
encodedName, e);
zkOrd.setVersion(-1);
return false;
}
boolean b = isGoodVersion(zkOrd);
if (!b) {
LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
encodedName);
}
return b;
}
/**
* Update our OPENING state in zookeeper.
* Do this so master doesn't timeout this region-in-transition.
* We may lose the znode ownership during the open. Currently its
* too hard interrupting ongoing region open. Just let it complete
* and check we still have the znode after region open.
*
* @param context Some context to add to logs if failure
* @return True if successful transition.
*/
@Override
public boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo,
RegionServerServices rsServices, final String context) {
ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
if (!isRegionStillOpening(regionInfo, rsServices)) {
LOG.warn("Open region aborted since it isn't opening any more");
return false;
}
// If previous checks failed... do not try again.
if (!isGoodVersion(zkOrd)) return false;
String encodedName = regionInfo.getEncodedName();
try {
zkOrd.setVersion(ZKAssign.confirmNodeOpening(watcher,
regionInfo, zkOrd.getServerName(), zkOrd.getVersion()));
} catch (KeeperException e) {
coordination.getServer().abort("Exception refreshing OPENING; region=" + encodedName +
", context=" + context, e);
zkOrd.setVersion(-1);
return false;
}
boolean b = isGoodVersion(zkOrd);
if (!b) {
LOG.warn("Failed refreshing OPENING; region=" + encodedName +
", context=" + context);
}
return b;
}
/**
* Try to transition to open.
*
* This is not guaranteed to succeed, we just do our best.
*
* @param rsServices
* @param hri Region we're working on.
* @param ord Details about region open task
* @return whether znode is successfully transitioned to FAILED_OPEN state.
*/
@Override
public boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
final HRegionInfo hri,
OpenRegionDetails ord) {
ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
boolean result = false;
final String name = hri.getRegionNameAsString();
try {
LOG.info("Opening of region " + hri + " failed, transitioning" +
" from OFFLINE to FAILED_OPEN in ZK, expecting version " +
zkOrd.getVersionOfOfflineNode());
if (ZKAssign.transitionNode(
rsServices.getZooKeeper(), hri,
rsServices.getServerName(),
EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_FAILED_OPEN,
zkOrd.getVersionOfOfflineNode()) == -1) {
LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
"It's likely that the master already timed out this open " +
"attempt, and thus another RS already has the region.");
} else {
result = true;
}
} catch (KeeperException e) {
LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
}
return result;
}
private boolean isGoodVersion(ZkOpenRegionDetails zkOrd) {
return zkOrd.getVersion() != -1;
}
/**
* This is not guaranteed to succeed, we just do our best.
* @param hri Region we're working on.
* @return whether znode is successfully transitioned to FAILED_OPEN state.
*/
@Override
public boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri,
OpenRegionDetails ord) {
ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
boolean result = false;
final String name = hri.getRegionNameAsString();
try {
LOG.info("Opening of region " + hri + " failed, transitioning" +
" from OPENING to FAILED_OPEN in ZK, expecting version " + zkOrd.getVersion());
if (ZKAssign.transitionNode(
watcher, hri,
zkOrd.getServerName(),
EventType.RS_ZK_REGION_OPENING,
EventType.RS_ZK_REGION_FAILED_OPEN,
zkOrd.getVersion()) == -1) {
LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
"It's likely that the master already timed out this open " +
"attempt, and thus another RS already has the region.");
} else {
result = true;
}
} catch (KeeperException e) {
LOG.error("Failed transitioning node " + name +
" from OPENING to FAILED_OPEN", e);
}
return result;
}
/**
* Parse ZK-related fields from request.
*/
@Override
public OpenRegionCoordination.OpenRegionDetails parseFromProtoRequest(
AdminProtos.OpenRegionRequest.RegionOpenInfo regionOpenInfo) {
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
int versionOfOfflineNode = -1;
if (regionOpenInfo.hasVersionOfOfflineNode()) {
versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
}
zkCrd.setVersionOfOfflineNode(versionOfOfflineNode);
zkCrd.setServerName(coordination.getServer().getServerName());
return zkCrd;
}
/**
* No ZK tracking will be performed for that case.
* This method should be used when we want to construct CloseRegionDetails,
* but don't want any coordination on that (when it's initiated by regionserver),
* so no znode state transitions will be performed.
*/
@Override
public OpenRegionCoordination.OpenRegionDetails getDetailsForNonCoordinatedOpening() {
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setVersionOfOfflineNode(-1);
zkCrd.setServerName(coordination.getServer().getServerName());
return zkCrd;
}
//--------------------------
// HMaster-side operations
//--------------------------
@Override
public boolean commitOpenOnMasterSide(AssignmentManager assignmentManager,
HRegionInfo regionInfo,
OpenRegionDetails ord) {
boolean committedSuccessfully = true;
// Code to defend against case where we get SPLIT before region open
// processing completes; temporary till we make SPLITs go via zk -- 0.92.
RegionState regionState = assignmentManager.getRegionStates()
.getRegionTransitionState(regionInfo.getEncodedName());
boolean openedNodeDeleted = false;
if (regionState != null && regionState.isOpened()) {
openedNodeDeleted = deleteOpenedNode(regionInfo, ord);
if (!openedNodeDeleted) {
LOG.error("Znode of region " + regionInfo.getShortNameToLog() + " could not be deleted.");
}
} else {
LOG.warn("Skipping the onlining of " + regionInfo.getShortNameToLog() +
" because regions is NOT in RIT -- presuming this is because it SPLIT");
}
if (!openedNodeDeleted) {
if (assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
debugLog(regionInfo, "Opened region "
+ regionInfo.getShortNameToLog() + " but "
+ "this table is disabled, triggering close of region");
committedSuccessfully = false;
}
}
return committedSuccessfully;
}
private boolean deleteOpenedNode(HRegionInfo regionInfo, OpenRegionDetails ord) {
ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
int expectedVersion = zkOrd.getVersion();
debugLog(regionInfo, "Handling OPENED of " +
regionInfo.getShortNameToLog() + " from " + zkOrd.getServerName().toString() +
"; deleting unassigned node");
try {
// delete the opened znode only if the version matches.
return ZKAssign.deleteNode(this.coordination.getServer().getZooKeeper(),
regionInfo.getEncodedName(), EventType.RS_ZK_REGION_OPENED, expectedVersion);
} catch(KeeperException.NoNodeException e){
// Getting no node exception here means that already the region has been opened.
LOG.warn("The znode of the region " + regionInfo.getShortNameToLog() +
" would have already been deleted");
return false;
} catch (KeeperException e) {
this.coordination.getServer().abort("Error deleting OPENED node in ZK (" +
regionInfo.getRegionNameAsString() + ")", e);
}
return false;
}
private void debugLog(HRegionInfo region, String string) {
if (region.isMetaTable()) {
LOG.info(string);
} else {
LOG.debug(string);
}
}
// Additional classes and helper methods
/**
* ZK-based implementation. Has details about whether the state transition should be
* reflected in ZK, as well as expected version of znode.
*/
public static class ZkOpenRegionDetails implements OpenRegionCoordination.OpenRegionDetails {
// We get version of our znode at start of open process and monitor it across
// the total open. We'll fail the open if someone hijacks our znode; we can
// tell this has happened if version is not as expected.
private volatile int version = -1;
//version of the offline node that was set by the master
private volatile int versionOfOfflineNode = -1;
/**
* Server name the handler is running on.
*/
private ServerName serverName;
public ZkOpenRegionDetails() {
}
public ZkOpenRegionDetails(int versionOfOfflineNode) {
this.versionOfOfflineNode = versionOfOfflineNode;
}
public int getVersionOfOfflineNode() {
return versionOfOfflineNode;
}
public void setVersionOfOfflineNode(int versionOfOfflineNode) {
this.versionOfOfflineNode = versionOfOfflineNode;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
@Override
public ServerName getServerName() {
return serverName;
}
@Override
public void setServerName(ServerName serverName) {
this.serverName = serverName;
}
}
private boolean isRegionStillOpening(HRegionInfo regionInfo, RegionServerServices rsServices) {
byte[] encodedName = regionInfo.getEncodedNameAsBytes();
Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
return Boolean.TRUE.equals(action); // true means opening for RIT
}
}

View File

@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@ -577,8 +580,20 @@ public class AssignmentManager extends ZooKeeperListener {
return false;
}
}
// TODO: This code is tied to ZK anyway, so for now leaving it as is,
// will refactor when whole region assignment will be abstracted from ZK
BaseCoordinatedStateManager cp =
(BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkOrd.setVersion(stat.getVersion());
zkOrd.setServerName(cp.getServer().getServerName());
return processRegionsInTransition(
rt, hri, stat.getVersion());
rt, hri, openRegionCoordination, zkOrd);
} finally {
lock.unlock();
}
@ -594,7 +609,8 @@ public class AssignmentManager extends ZooKeeperListener {
*/
boolean processRegionsInTransition(
final RegionTransition rt, final HRegionInfo regionInfo,
final int expectedVersion) throws KeeperException {
OpenRegionCoordination coordination,
final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
EventType et = rt.getEventType();
// Get ServerName. Could not be null.
final ServerName sn = rt.getServerName();
@ -652,6 +668,8 @@ public class AssignmentManager extends ZooKeeperListener {
public void process() throws IOException {
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
try {
final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
.getVersion();
unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
if (regionStates.isRegionOffline(regionInfo)) {
assign(regionInfo, true);
@ -699,7 +717,7 @@ public class AssignmentManager extends ZooKeeperListener {
// This could be done asynchronously, we would need then to acquire the lock in the
// handler.
regionStates.updateRegionState(rt, State.OPEN);
new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
break;
case RS_ZK_REQUEST_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING:
@ -748,10 +766,12 @@ public class AssignmentManager extends ZooKeeperListener {
* <p>
* This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
* yet).
* @param rt
* @param expectedVersion
* @param rt region transition
* @param coordination coordination for opening region
* @param ord details about opening region
*/
void handleRegion(final RegionTransition rt, int expectedVersion) {
void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) {
if (rt == null) {
LOG.warn("Unexpected NULL input for RegionTransition rt");
return;
@ -931,7 +951,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (regionState != null) {
failedOpenTracker.remove(encodedName); // reset the count, if any
new OpenedRegionHandler(
server, this, regionState.getRegion(), sn, expectedVersion).process();
server, this, regionState.getRegion(), coordination, ord).process();
updateOpenedRegionHandlerTracker(regionState.getRegion());
}
break;
@ -1299,7 +1319,19 @@ public class AssignmentManager extends ZooKeeperListener {
if (data == null) return;
RegionTransition rt = RegionTransition.parseFrom(data);
handleRegion(rt, stat.getVersion());
// TODO: This code is tied to ZK anyway, so for now leaving it as is,
// will refactor when whole region assignment will be abstracted from ZK
BaseCoordinatedStateManager csm =
(BaseCoordinatedStateManager) server.getCoordinatedStateManager();
OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkOrd.setVersion(stat.getVersion());
zkOrd.setServerName(csm.getServer().getServerName());
handleRegion(rt, openRegionCoordination, zkOrd);
} catch (KeeperException e) {
server.abort("Unexpected ZK exception reading unassigned node data", e);
} catch (DeserializationException e) {

View File

@ -18,21 +18,16 @@
*/
package org.apache.hadoop.hbase.master.handler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
/**
* Handles OPENED region event on Master.
@ -42,9 +37,10 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
private static final Log LOG = LogFactory.getLog(OpenedRegionHandler.class);
private final AssignmentManager assignmentManager;
private final HRegionInfo regionInfo;
private final ServerName sn;
private final OpenedPriority priority;
private final int expectedVersion;
private OpenRegionCoordination coordination;
private OpenRegionCoordination.OpenRegionDetails ord;
private enum OpenedPriority {
META (1),
@ -62,12 +58,13 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
public OpenedRegionHandler(Server server,
AssignmentManager assignmentManager, HRegionInfo regionInfo,
ServerName sn, int expectedVersion) {
OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) {
super(server, EventType.RS_ZK_REGION_OPENED);
this.assignmentManager = assignmentManager;
this.regionInfo = regionInfo;
this.sn = sn;
this.expectedVersion = expectedVersion;
this.coordination = coordination;
this.ord = ord;
if(regionInfo.isMetaRegion()) {
priority = OpenedPriority.META;
} else if(regionInfo.getTable()
@ -99,56 +96,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
@Override
public void process() {
// Code to defend against case where we get SPLIT before region open
// processing completes; temporary till we make SPLITs go via zk -- 0.92.
RegionState regionState = this.assignmentManager.getRegionStates()
.getRegionTransitionState(regionInfo.getEncodedName());
boolean openedNodeDeleted = false;
if (regionState != null && regionState.isOpened()) {
openedNodeDeleted = deleteOpenedNode(expectedVersion);
if (!openedNodeDeleted) {
LOG.error("Znode of region " + regionInfo.getShortNameToLog() + " could not be deleted.");
}
} else {
LOG.warn("Skipping the onlining of " + regionInfo.getShortNameToLog() +
" because regions is NOT in RIT -- presuming this is because it SPLIT");
}
if (!openedNodeDeleted) {
if (this.assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
debugLog(regionInfo, "Opened region "
+ regionInfo.getShortNameToLog() + " but "
+ "this table is disabled, triggering close of region");
if (!coordination.commitOpenOnMasterSide(assignmentManager,regionInfo, ord)) {
assignmentManager.unassign(regionInfo);
}
}
}
private boolean deleteOpenedNode(int expectedVersion) {
debugLog(regionInfo, "Handling OPENED of " +
this.regionInfo.getShortNameToLog() + " from " + this.sn.toString() +
"; deleting unassigned node");
try {
// delete the opened znode only if the version matches.
return ZKAssign.deleteNode(server.getZooKeeper(),
regionInfo.getEncodedName(), EventType.RS_ZK_REGION_OPENED, expectedVersion);
} catch(KeeperException.NoNodeException e){
// Getting no node exception here means that already the region has been opened.
LOG.warn("The znode of the region " + regionInfo.getShortNameToLog() +
" would have already been deleted");
return false;
} catch (KeeperException e) {
server.abort("Error deleting OPENED node in ZK (" +
regionInfo.getRegionNameAsString() + ")", e);
}
return false;
}
private void debugLog(HRegionInfo region, String string) {
if (region.isMetaTable()) {
LOG.info(string);
} else {
LOG.debug(string);
}
}
}

View File

@ -141,6 +141,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -1188,11 +1189,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final boolean isBulkAssign = regionCount > 1;
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
getOpenRegionCoordination();
OpenRegionCoordination.OpenRegionDetails ord =
coordination.parseFromProtoRequest(regionOpenInfo);
int versionOfOfflineNode = -1;
if (regionOpenInfo.hasVersionOfOfflineNode()) {
versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
}
HTableDescriptor htd;
try {
final HRegion onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
@ -1237,8 +1238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (Boolean.FALSE.equals(previous)) {
// There is a close in progress. We need to mark this open as failed in ZK.
OpenRegionHandler.
tryTransitionFromOfflineToFailedOpen(regionServer, region, versionOfOfflineNode);
coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
+ region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
@ -1266,12 +1267,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Need to pass the expected version in the constructor.
if (region.isMetaRegion()) {
regionServer.service.submit(new OpenMetaHandler(
regionServer, regionServer, region, htd, versionOfOfflineNode));
regionServer, regionServer, region, htd, coordination, ord));
} else {
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
regionServer.service.submit(new OpenRegionHandler(
regionServer, regionServer, region, htd, versionOfOfflineNode));
regionServer, regionServer, region, htd, coordination, ord));
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
/**
* Handles opening of a meta region on a region server.
@ -34,13 +35,9 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
public class OpenMetaHandler extends OpenRegionHandler {
public OpenMetaHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd) {
this(server, rsServices, regionInfo, htd, -1);
}
public OpenMetaHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd, int versionOfOfflineNode) {
final HTableDescriptor htd, OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) {
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META,
versionOfOfflineNode);
coordination, ord);
}
}

View File

@ -32,10 +32,9 @@ import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
/**
* Handles opening of a region on a region server.
* <p>
@ -50,34 +49,27 @@ public class OpenRegionHandler extends EventHandler {
private final HRegionInfo regionInfo;
private final HTableDescriptor htd;
// We get version of our znode at start of open process and monitor it across
// the total open. We'll fail the open if someone hijacks our znode; we can
// tell this has happened if version is not as expected.
private volatile int version = -1;
//version of the offline node that was set by the master
private volatile int versionOfOfflineNode = -1;
private OpenRegionCoordination coordination;
private OpenRegionCoordination.OpenRegionDetails ord;
public OpenRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
HTableDescriptor htd) {
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1);
}
public OpenRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
HTableDescriptor htd, int versionOfOfflineNode) {
HTableDescriptor htd, OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) {
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
versionOfOfflineNode);
coordination, ord);
}
protected OpenRegionHandler(final Server server,
final RegionServerServices rsServices, final HRegionInfo regionInfo,
final HTableDescriptor htd, EventType eventType,
final int versionOfOfflineNode) {
OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) {
super(server, eventType);
this.rsServices = rsServices;
this.regionInfo = regionInfo;
this.htd = htd;
this.versionOfOfflineNode = versionOfOfflineNode;
this.coordination = coordination;
this.ord = ord;
}
public HRegionInfo getRegionInfo() {
@ -112,13 +104,13 @@ public class OpenRegionHandler extends EventHandler {
// Check that we're still supposed to open the region and transition.
// If fails, just return. Someone stole the region from under us.
// Calling transitionZookeeperOfflineToOpening initializes this.version.
// Calling transitionFromOfflineToOpening initializes this.version.
if (!isRegionStillOpening()){
LOG.error("Region " + encodedName + " opening cancelled");
return;
}
if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
if (!coordination.transitionFromOfflineToOpening(regionInfo, ord)) {
LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
// This is a desperate attempt: the znode is unlikely to be ours. But we can't do more.
return;
@ -132,7 +124,7 @@ public class OpenRegionHandler extends EventHandler {
}
boolean failed = true;
if (tickleOpening("post_region_open")) {
if (coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open")) {
if (updateMeta(region)) {
failed = false;
}
@ -142,8 +134,7 @@ public class OpenRegionHandler extends EventHandler {
return;
}
if (!isRegionStillOpening() || !transitionToOpened(region)) {
if (!isRegionStillOpening() || !coordination.transitionToOpened(region, ord)) {
// If we fail to transition to opened, it's because of one of two cases:
// (a) we lost our ZK lease
// OR (b) someone else opened the region before us
@ -173,7 +164,7 @@ public class OpenRegionHandler extends EventHandler {
} finally {
// Do all clean up here
if (!openSuccessful) {
doCleanUpOnFailedOpen(region, transitionedToOpening);
doCleanUpOnFailedOpen(region, transitionedToOpening, ord);
}
final Boolean current = this.rsServices.getRegionsInTransitionInRS().
remove(this.regionInfo.getEncodedNameAsBytes());
@ -200,7 +191,8 @@ public class OpenRegionHandler extends EventHandler {
}
}
private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening)
private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening,
OpenRegionCoordination.OpenRegionDetails ord)
throws IOException {
if (transitionedToOpening) {
try {
@ -210,12 +202,12 @@ public class OpenRegionHandler extends EventHandler {
} finally {
// Even if cleanupFailed open fails we need to do this transition
// See HBASE-7698
tryTransitionFromOpeningToFailedOpen(regionInfo);
coordination.tryTransitionFromOpeningToFailedOpen(regionInfo, ord);
}
} else {
// If still transition to OPENING is not done, we need to transition znode
// to FAILED_OPEN
tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, versionOfOfflineNode);
coordination.tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, ord);
}
}
@ -250,7 +242,7 @@ public class OpenRegionHandler extends EventHandler {
if (elapsed > 120000) { // 2 minutes, no need to tickleOpening too often
// Only tickle OPENING if postOpenDeployTasks is taking some time.
lastUpdate = now;
tickleOpening = tickleOpening("post_open_deploy");
tickleOpening = coordination.tickleOpening(ord, regionInfo, rsServices, "post_open_deploy");
}
synchronized (signaller) {
try {
@ -314,7 +306,7 @@ public class OpenRegionHandler extends EventHandler {
try {
this.services.postOpenDeployTasks(this.region,
this.server.getCatalogTracker());
} catch (KeeperException e) {
} catch (IOException e) {
server.abort("Exception running postOpenDeployTasks; region=" +
this.region.getRegionInfo().getEncodedName(), e);
} catch (Throwable e) {
@ -337,131 +329,22 @@ public class OpenRegionHandler extends EventHandler {
}
}
/**
* @param r Region we're working on.
* @return whether znode is successfully transitioned to OPENED state.
* @throws IOException
*/
boolean transitionToOpened(final HRegion r) throws IOException {
boolean result = false;
HRegionInfo hri = r.getRegionInfo();
final String name = hri.getRegionNameAsString();
// Finally, Transition ZK node to OPENED
try {
if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri,
this.server.getServerName(), this.version) == -1) {
String warnMsg = "Completed the OPEN of region " + name +
" but when transitioning from " + " OPENING to OPENED ";
try {
String node = ZKAssign.getNodeName(this.server.getZooKeeper(), hri.getEncodedName());
if (ZKUtil.checkExists(this.server.getZooKeeper(), node) < 0) {
// if the znode
rsServices.abort(warnMsg + "the znode disappeared", null);
} else {
LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " +
"so now unassigning -- closing region on server: " + this.server.getServerName());
}
} catch (KeeperException ke) {
rsServices.abort(warnMsg, ke);
}
} else {
LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() +
" to OPENED in zk on " + this.server.getServerName());
result = true;
}
} catch (KeeperException e) {
LOG.error("Failed transitioning node " + name +
" from OPENING to OPENED -- closing region", e);
}
return result;
}
/**
* This is not guaranteed to succeed, we just do our best.
* @param hri Region we're working on.
* @return whether znode is successfully transitioned to FAILED_OPEN state.
*/
private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) {
boolean result = false;
final String name = hri.getRegionNameAsString();
try {
LOG.info("Opening of region " + hri + " failed, transitioning" +
" from OPENING to FAILED_OPEN in ZK, expecting version " + this.version);
if (ZKAssign.transitionNode(
this.server.getZooKeeper(), hri,
this.server.getServerName(),
EventType.RS_ZK_REGION_OPENING,
EventType.RS_ZK_REGION_FAILED_OPEN,
this.version) == -1) {
LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
"It's likely that the master already timed out this open " +
"attempt, and thus another RS already has the region.");
} else {
result = true;
}
} catch (KeeperException e) {
LOG.error("Failed transitioning node " + name +
" from OPENING to FAILED_OPEN", e);
}
return result;
}
/**
* Try to transition to open. This function is static to make it usable before creating the
* handler.
*
* This is not guaranteed to succeed, we just do our best.
*
* @param rsServices
* @param hri Region we're working on.
* @param versionOfOfflineNode version to checked.
* @return whether znode is successfully transitioned to FAILED_OPEN state.
*/
public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
final HRegionInfo hri, final int versionOfOfflineNode) {
boolean result = false;
final String name = hri.getRegionNameAsString();
try {
LOG.info("Opening of region " + hri + " failed, transitioning" +
" from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode);
if (ZKAssign.transitionNode(
rsServices.getZooKeeper(), hri,
rsServices.getServerName(),
EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_FAILED_OPEN,
versionOfOfflineNode) == -1) {
LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
"It's likely that the master already timed out this open " +
"attempt, and thus another RS already has the region.");
} else {
result = true;
}
} catch (KeeperException e) {
LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
}
return result;
}
/**
* @return Instance of HRegion if successful open else null.
*/
HRegion openRegion() {
HRegion region = null;
try {
// Instantiate the region. This also periodically tickles our zk OPENING
// Instantiate the region. This also periodically tickles OPENING
// state so master doesn't timeout this region in transition.
region = HRegion.openHRegion(this.regionInfo, this.htd,
this.rsServices.getWAL(this.regionInfo),
this.server.getConfiguration(),
this.rsServices,
this.rsServices.getWAL(this.regionInfo),
this.server.getConfiguration(),
this.rsServices,
new CancelableProgressable() {
public boolean progress() {
// We may lose the znode ownership during the open. Currently its
// too hard interrupting ongoing region open. Just let it complete
// and check we still have the znode after region open.
return tickleOpening("open_region_progress");
// if tickle failed, we need to cancel opening region.
return coordination.tickleOpening(ord, regionInfo, rsServices, "open_region_progress");
}
});
} catch (Throwable t) {
@ -495,70 +378,4 @@ public class OpenRegionHandler extends EventHandler {
Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
return Boolean.TRUE.equals(action); // true means opening for RIT
}
/**
* Transition ZK node from OFFLINE to OPENING.
* @param encodedName Name of the znode file (Region encodedName is the znode
* name).
* @param versionOfOfflineNode - version Of OfflineNode that needs to be compared
* before changing the node's state from OFFLINE
* @return True if successful transition.
*/
boolean transitionZookeeperOfflineToOpening(final String encodedName,
int versionOfOfflineNode) {
// TODO: should also handle transition from CLOSED?
try {
// Initialize the znode version.
this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo,
server.getServerName(), EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
} catch (KeeperException e) {
LOG.error("Error transition from OFFLINE to OPENING for region=" +
encodedName, e);
this.version = -1;
return false;
}
boolean b = isGoodVersion();
if (!b) {
LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
encodedName);
}
return b;
}
/**
* Update our OPENING state in zookeeper.
* Do this so master doesn't timeout this region-in-transition.
* @param context Some context to add to logs if failure
* @return True if successful transition.
*/
boolean tickleOpening(final String context) {
if (!isRegionStillOpening()) {
LOG.warn("Open region aborted since it isn't opening any more");
return false;
}
// If previous checks failed... do not try again.
if (!isGoodVersion()) return false;
String encodedName = this.regionInfo.getEncodedName();
try {
this.version =
ZKAssign.confirmNodeOpening(server.getZooKeeper(),
this.regionInfo, this.server.getServerName(), this.version);
} catch (KeeperException e) {
server.abort("Exception refreshing OPENING; region=" + encodedName +
", context=" + context, e);
this.version = -1;
return false;
}
boolean b = isGoodVersion();
if (!b) {
LOG.warn("Failed refreshing OPENING; region=" + encodedName +
", context=" + context);
}
return b;
}
private boolean isGoodVersion() {
return this.version != -1;
}
}

View File

@ -54,6 +54,9 @@ import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
@ -876,9 +879,19 @@ public class TestAssignmentManager {
am.getRegionStates().createRegionState(REGIONINFO);
am.gate.set(false);
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
assertFalse(am.processRegionsInTransition(rt, REGIONINFO, version));
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.ENABLED);
BaseCoordinatedStateManager cp = new ZkCoordinatedStateManager();
cp.initialize(server);
cp.start();
OpenRegionCoordination orc = cp.getOpenRegionCoordination();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkOrd.setServerName(server.getServerName());
zkOrd.setVersion(version);
assertFalse(am.processRegionsInTransition(rt, REGIONINFO, orc, zkOrd));
am.getTableStateManager().setTableState(REGIONINFO.getTable(), Table.State.ENABLED);
processServerShutdownHandler(ct, am, false);
// Waiting for the assignment to get completed.
while (!am.gate.get()) {
@ -1357,8 +1370,9 @@ public class TestAssignmentManager {
this.serverManager, ct, balancer, null, null, master.getTableLockManager()) {
@Override
void handleRegion(final RegionTransition rt, int expectedVersion) {
super.handleRegion(rt, expectedVersion);
void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) {
super.handleRegion(rt, coordination, ord);
if (rt != null && Bytes.equals(hri.getRegionName(),
rt.getRegionName()) && rt.getEventType() == EventType.RS_ZK_REGION_OPENING) {
zkEventProcessed.set(true);

View File

@ -31,6 +31,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -137,8 +141,17 @@ public class TestOpenedRegionHandler {
ZKUtil.getDataAndWatch(zkw, nodeName, stat);
// use the version for the OpenedRegionHandler
BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(server);
csm.start();
OpenRegionCoordination orc = csm.getOpenRegionCoordination();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkOrd.setServerName(server.getServerName());
zkOrd.setVersion(stat.getVersion());
OpenedRegionHandler handler = new OpenedRegionHandler(server, am, region
.getRegionInfo(), server.getServerName(), stat.getVersion());
.getRegionInfo(), orc, zkOrd);
// Once again overwrite the same znode so that the version changes.
ZKAssign.transitionNode(zkw, region.getRegionInfo(), server
.getServerName(), EventType.RS_ZK_REGION_OPENED,

View File

@ -31,6 +31,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -341,7 +344,18 @@ public class TestRegionServerNoMaster {
// Let's start the open handler
HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(getRS());
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(getRS().getServerName());
zkCrd.setVersionOfOfflineNode(0);
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
csm.getOpenRegionCoordination(), zkCrd));
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed();
@ -395,7 +409,18 @@ public class TestRegionServerNoMaster {
// 2) The region in RIT was changed.
// The order is more or less implementation dependant.
HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(getRS());
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(getRS().getServerName());
zkCrd.setVersionOfOfflineNode(0);
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
csm.getOpenRegionCoordination(), zkCrd));
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed();

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventType;
@ -154,8 +155,12 @@ public class TestCloseRegionHandler {
HTableDescriptor htd = TEST_HTD;
final HRegionInfo hri = TEST_HRI;
ZkCoordinatedStateManager coordinationProvider = new ZkCoordinatedStateManager();
coordinationProvider.initialize(server);
coordinationProvider.start();
// open a region first so that it can be closed later
OpenRegion(server, rss, htd, hri);
OpenRegion(server, rss, htd, hri, coordinationProvider.getOpenRegionCoordination());
// close the region
// Create it CLOSING, which is what Master set before sending CLOSE RPC
@ -166,17 +171,13 @@ public class TestCloseRegionHandler {
// Given it is set to invalid versionOfClosingNode+1,
// CloseRegionHandler should be M_ZK_REGION_CLOSING
ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
consensusProvider.initialize(server);
consensusProvider.start();
ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
new ZkCloseRegionCoordination.ZkCloseRegionDetails();
zkCrd.setPublishStatusInZk(true);
zkCrd.setExpectedVersion(versionOfClosingNode+1);
CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
consensusProvider.getCloseRegionCoordination(), zkCrd);
coordinationProvider.getCloseRegionCoordination(), zkCrd);
handler.process();
// Handler should remain in M_ZK_REGION_CLOSING
@ -200,8 +201,12 @@ public class TestCloseRegionHandler {
HTableDescriptor htd = TEST_HTD;
HRegionInfo hri = TEST_HRI;
ZkCoordinatedStateManager coordinationProvider = new ZkCoordinatedStateManager();
coordinationProvider.initialize(server);
coordinationProvider.start();
// open a region first so that it can be closed later
OpenRegion(server, rss, htd, hri);
OpenRegion(server, rss, htd, hri, coordinationProvider.getOpenRegionCoordination());
// close the region
// Create it CLOSING, which is what Master set before sending CLOSE RPC
@ -212,17 +217,13 @@ public class TestCloseRegionHandler {
// Given it is set to correct versionOfClosingNode,
// CloseRegionHandlerit should be RS_ZK_REGION_CLOSED
ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
consensusProvider.initialize(server);
consensusProvider.start();
ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
new ZkCloseRegionCoordination.ZkCloseRegionDetails();
zkCrd.setPublishStatusInZk(true);
zkCrd.setExpectedVersion(versionOfClosingNode);
CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
consensusProvider.getCloseRegionCoordination(), zkCrd);
coordinationProvider.getCloseRegionCoordination(), zkCrd);
handler.process();
// Handler should have transitioned it to RS_ZK_REGION_CLOSED
RegionTransition rt = RegionTransition.parseFrom(
@ -231,11 +232,15 @@ public class TestCloseRegionHandler {
}
private void OpenRegion(Server server, RegionServerServices rss,
HTableDescriptor htd, HRegionInfo hri)
HTableDescriptor htd, HRegionInfo hri, OpenRegionCoordination coordination)
throws IOException, NodeExistsException, KeeperException, DeserializationException {
// Create it OFFLINE node, which is what Master set before sending OPEN RPC
ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName());
OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd);
OpenRegionCoordination.OpenRegionDetails ord =
coordination.getDetailsForNonCoordinatedOpening();
OpenRegionHandler openHandler =
new OpenRegionHandler(server, rss, hri, htd, coordination, ord);
rss.getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
openHandler.process();
// This parse is not used?

View File

@ -25,6 +25,8 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -96,7 +98,16 @@ public class TestOpenRegionHandler {
.getConfiguration(), htd);
assertNotNull(region);
try {
OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) {
ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(server);
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(server.getServerName());
OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri,
htd, csm.getOpenRegionCoordination(), zkCrd) {
HRegion openRegion() {
// Open region first, then remove znode as though it'd been hijacked.
HRegion region = super.openRegion();
@ -150,10 +161,22 @@ public class TestOpenRegionHandler {
.getConfiguration(), htd);
assertNotNull(region);
try {
OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) {
boolean transitionToOpened(final HRegion r) throws IOException {
ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(server);
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(server.getServerName());
ZkOpenRegionCoordination openRegionCoordination =
new ZkOpenRegionCoordination(csm, server.getZooKeeper()) {
@Override
public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord)
throws IOException {
// remove znode simulating intermittent zookeeper connection issue
ZooKeeperWatcher zkw = this.server.getZooKeeper();
ZooKeeperWatcher zkw = server.getZooKeeper();
String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
try {
ZKUtil.deleteNodeFailSilent(zkw, node);
@ -161,9 +184,12 @@ public class TestOpenRegionHandler {
throw new RuntimeException("Ugh failed delete of " + node, e);
}
// then try to transition to OPENED
return super.transitionToOpened(r);
return super.transitionToOpened(r, ord);
}
};
OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd,
openRegionCoordination, zkCrd);
rss.getRegionsInTransitionInRS().put(
hri.getEncodedNameAsBytes(), Boolean.TRUE);
// Call process without first creating OFFLINE region in zk, see if
@ -182,7 +208,7 @@ public class TestOpenRegionHandler {
// Region server is expected to abort due to OpenRegionHandler perceiving transitioning
// to OPENED as failed
// This was corresponding to the second handler.process() call above.
assertTrue("region server should have aborted", rss.isAborted());
assertTrue("region server should have aborted", server.isAborted());
}
@Test
@ -193,9 +219,18 @@ public class TestOpenRegionHandler {
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(server);
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(server.getServerName());
// Create the handler
OpenRegionHandler handler =
new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
csm.getOpenRegionCoordination(), zkCrd) {
@Override
HRegion openRegion() {
// Fake failure of opening a region due to an IOE, which is caught
@ -221,8 +256,16 @@ public class TestOpenRegionHandler {
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
// Create the handler
OpenRegionHandler handler =
new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(server);
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(server.getServerName());
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
csm.getOpenRegionCoordination(), zkCrd) {
@Override
boolean updateMeta(final HRegion r) {
// Fake failure of updating META
@ -246,7 +289,16 @@ public class TestOpenRegionHandler {
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
// Create the handler
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(server);
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(server.getServerName());
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
csm.getOpenRegionCoordination(), zkCrd) {
@Override
boolean updateMeta(HRegion r) {
return false;
@ -275,13 +327,25 @@ public class TestOpenRegionHandler {
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
// Create the handler
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager();
csm.initialize(server);
csm.start();
ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd =
new ZkOpenRegionCoordination.ZkOpenRegionDetails();
zkCrd.setServerName(server.getServerName());
ZkOpenRegionCoordination openRegionCoordination =
new ZkOpenRegionCoordination(csm, server.getZooKeeper()) {
@Override
boolean transitionZookeeperOfflineToOpening(String encodedName, int versionOfOfflineNode) {
public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
OpenRegionDetails ord) {
return false;
}
};
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
openRegionCoordination, zkCrd);
rsServices.getRegionsInTransitionInRS().put(TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE);
handler.process();