HBASE-7247 Assignment performances decreased by 50% because of regionserver.OpenRegionHandler#tickleOpening
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1465914 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
40d8060167
commit
bacb37c2b6
|
@ -585,15 +585,79 @@ public class ZKAssign {
|
|||
* @param zkw zk reference
|
||||
* @param region region to be transitioned to opening
|
||||
* @param serverName server transition happens on
|
||||
* @param updateZNode write the znode. If false, we only check.
|
||||
* @return version of node after transition, -1 if unsuccessful transition
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
*/
|
||||
public static int retransitionNodeOpening(ZooKeeperWatcher zkw,
|
||||
HRegionInfo region, ServerName serverName, int expectedVersion)
|
||||
HRegionInfo region, ServerName serverName, int expectedVersion, boolean updateZNode)
|
||||
throws KeeperException {
|
||||
return transitionNode(zkw, region, serverName,
|
||||
EventType.RS_ZK_REGION_OPENING,
|
||||
EventType.RS_ZK_REGION_OPENING, expectedVersion);
|
||||
|
||||
String encoded = region.getEncodedName();
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug(zkw.prefix("Attempting to retransition the opening state of node " +
|
||||
HRegionInfo.prettyPrint(encoded)));
|
||||
}
|
||||
|
||||
String node = getNodeName(zkw, encoded);
|
||||
zkw.sync(node);
|
||||
|
||||
// Read existing data of the node
|
||||
Stat stat = new Stat();
|
||||
byte [] existingBytes = ZKUtil.getDataNoWatch(zkw, node, stat);
|
||||
if (existingBytes == null) {
|
||||
// Node no longer exists. Return -1. It means unsuccessful transition.
|
||||
return -1;
|
||||
}
|
||||
RegionTransition rt = getRegionTransition(existingBytes);
|
||||
|
||||
// Verify it is the expected version
|
||||
if (expectedVersion != -1 && stat.getVersion() != expectedVersion) {
|
||||
LOG.warn(zkw.prefix("Attempt to retransition the opening state of the " +
|
||||
"unassigned node for " + encoded + " failed, " +
|
||||
"the node existed but was version " + stat.getVersion() +
|
||||
" not the expected version " + expectedVersion));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Verify it is in expected state
|
||||
EventType et = rt.getEventType();
|
||||
if (!et.equals(EventType.RS_ZK_REGION_OPENING)) {
|
||||
String existingServer = (rt.getServerName() == null)
|
||||
? "<unknown>" : rt.getServerName().toString();
|
||||
LOG.warn(zkw.prefix("Attempt to retransition the opening state of the unassigned node for "
|
||||
+ encoded + " failed, the node existed but was in the state " + et +
|
||||
" set by the server " + existingServer));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// We don't have to write the new state: the check is complete.
|
||||
if (!updateZNode){
|
||||
return expectedVersion;
|
||||
}
|
||||
|
||||
// Write new data, ensuring data has not changed since we last read it
|
||||
try {
|
||||
rt = RegionTransition.createRegionTransition(
|
||||
EventType.RS_ZK_REGION_OPENING, region.getRegionName(), serverName, null);
|
||||
if(!ZKUtil.setData(zkw, node, rt.toByteArray(), stat.getVersion())) {
|
||||
LOG.warn(zkw.prefix("Attempt to retransition the opening state of the " +
|
||||
"unassigned node for " + encoded + " failed, " +
|
||||
"the node existed and was in the expected state but then when " +
|
||||
"setting data we got a version mismatch"));
|
||||
return -1;
|
||||
}
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug(zkw.prefix("Successfully retransition the opening state of node " + encoded));
|
||||
}
|
||||
return stat.getVersion() + 1;
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
LOG.warn(zkw.prefix("Attempt to retransition the opening state of the " +
|
||||
"unassigned node for " + encoded + " failed, " +
|
||||
"the node existed and was in the expected state but then when " +
|
||||
"setting data it no longer existed"));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -712,6 +776,7 @@ public class ZKAssign {
|
|||
return transitionNode(zkw, region, serverName, beginState, endState, expectedVersion, null);
|
||||
}
|
||||
|
||||
|
||||
public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
|
||||
ServerName serverName, EventType beginState, EventType endState,
|
||||
int expectedVersion, final byte [] payload)
|
||||
|
|
|
@ -50,6 +50,9 @@ public class OpenRegionHandler extends EventHandler {
|
|||
private final HRegionInfo regionInfo;
|
||||
private final HTableDescriptor htd;
|
||||
|
||||
private boolean tomActivated;
|
||||
private int assignmentTimeout;
|
||||
|
||||
// We get version of our znode at start of open process and monitor it across
|
||||
// the total open. We'll fail the open if someone hijacks our znode; we can
|
||||
// tell this has happened if version is not as expected.
|
||||
|
@ -78,6 +81,10 @@ public class OpenRegionHandler extends EventHandler {
|
|||
this.regionInfo = regionInfo;
|
||||
this.htd = htd;
|
||||
this.versionOfOfflineNode = versionOfOfflineNode;
|
||||
tomActivated = this.server.getConfiguration().
|
||||
getBoolean("hbase.assignment.timeout.management", false);
|
||||
assignmentTimeout = this.server.getConfiguration().
|
||||
getInt("hbase.master.assignment.timeoutmonitor.period", 10000);
|
||||
}
|
||||
|
||||
public HRegionInfo getRegionInfo() {
|
||||
|
@ -234,10 +241,6 @@ public class OpenRegionHandler extends EventHandler {
|
|||
PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
|
||||
this.server, this.rsServices, signaller);
|
||||
t.start();
|
||||
boolean tomActivated = this.server.getConfiguration().
|
||||
getBoolean("hbase.assignment.timeout.management", false);
|
||||
int assignmentTimeout = this.server.getConfiguration().
|
||||
getInt("hbase.master.assignment.timeoutmonitor.period", 10000);
|
||||
// Total timeout for meta edit. If we fail adding the edit then close out
|
||||
// the region and let it be assigned elsewhere.
|
||||
long timeout = assignmentTimeout * 10;
|
||||
|
@ -250,14 +253,12 @@ public class OpenRegionHandler extends EventHandler {
|
|||
boolean tickleOpening = true;
|
||||
while (!signaller.get() && t.isAlive() && !this.server.isStopped() &&
|
||||
!this.rsServices.isStopping() && (endTime > now)) {
|
||||
if (tomActivated) {
|
||||
long elapsed = now - lastUpdate;
|
||||
if (elapsed > period) {
|
||||
// Only tickle OPENING if postOpenDeployTasks is taking some time.
|
||||
lastUpdate = now;
|
||||
tickleOpening = tickleOpening("post_open_deploy");
|
||||
}
|
||||
}
|
||||
synchronized (signaller) {
|
||||
try {
|
||||
signaller.wait(period);
|
||||
|
@ -294,7 +295,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
* Thread to run region post open tasks. Call {@link #getException()} after
|
||||
* the thread finishes to check for exceptions running
|
||||
* {@link RegionServerServices#postOpenDeployTasks(
|
||||
* HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}
|
||||
* HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker)}
|
||||
* .
|
||||
*/
|
||||
static class PostOpenDeployTasksThread extends Thread {
|
||||
|
@ -532,7 +533,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
try {
|
||||
this.version =
|
||||
ZKAssign.retransitionNodeOpening(server.getZooKeeper(),
|
||||
this.regionInfo, this.server.getServerName(), this.version);
|
||||
this.regionInfo, this.server.getServerName(), this.version, tomActivated);
|
||||
} catch (KeeperException e) {
|
||||
server.abort("Exception refreshing OPENING; region=" + encodedName +
|
||||
", context=" + context, e);
|
||||
|
|
Loading…
Reference in New Issue