HBASE-3789 Cleanup the locking contention in the master
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1136140 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2aade76735
commit
0a585ffbc8
|
@ -263,6 +263,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3982 Improvements to TestHFileSeek
|
||||
HBASE-3940 HBase daemons should log version info at startup and possibly
|
||||
periodically (Li Pi)
|
||||
HBASE-3789 Cleanup the locking contention in the master
|
||||
|
||||
TASKS
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.TreeMap;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -70,7 +71,6 @@ import org.apache.hadoop.hbase.util.Writables;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKTable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
@ -446,10 +446,17 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (!isInStateForSplitting(regionState)) break;
|
||||
// If null, add SPLITTING state before going to SPLIT
|
||||
if (regionState == null) {
|
||||
LOG.info("Received SPLIT for region " + prettyPrintedRegionName +
|
||||
" from server " + sn +
|
||||
regionState = addSplittingToRIT(sn, encodedName);
|
||||
String message = "Received SPLIT for region " + prettyPrintedRegionName +
|
||||
" from server " + sn;
|
||||
// If still null, it means we cannot find it and it was already processed
|
||||
if (regionState == null) {
|
||||
LOG.warn(message + " but it doesn't exist anymore," +
|
||||
" probably already processed its split");
|
||||
break;
|
||||
}
|
||||
LOG.info(message +
|
||||
" but region was not first in SPLITTING state; continuing");
|
||||
addSplittingToRIT(sn, encodedName);
|
||||
}
|
||||
// Check it has daughters.
|
||||
byte [] payload = data.getPayload();
|
||||
|
@ -589,9 +596,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
RegionState regionState = null;
|
||||
synchronized (this.regions) {
|
||||
regionState = findHRegionInfoThenAddToRIT(serverName, encodedName);
|
||||
if (regionState != null) {
|
||||
regionState.update(RegionState.State.SPLITTING,
|
||||
System.currentTimeMillis(), serverName);
|
||||
}
|
||||
}
|
||||
return regionState;
|
||||
}
|
||||
|
||||
|
@ -689,7 +698,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
@Override
|
||||
public void nodeCreated(String path) {
|
||||
if(path.startsWith(watcher.assignmentZNode)) {
|
||||
synchronized(regionsInTransition) {
|
||||
try {
|
||||
RegionTransitionData data = ZKAssign.getData(watcher, path);
|
||||
if (data == null) {
|
||||
|
@ -701,7 +709,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Existing unassigned node has had data changed.
|
||||
|
@ -718,7 +725,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
@Override
|
||||
public void nodeDataChanged(String path) {
|
||||
if(path.startsWith(watcher.assignmentZNode)) {
|
||||
synchronized(regionsInTransition) {
|
||||
try {
|
||||
RegionTransitionData data = ZKAssign.getData(watcher, path);
|
||||
if (data == null) {
|
||||
|
@ -730,7 +736,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDeleted(String path) {
|
||||
|
@ -762,26 +767,20 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* <ol>
|
||||
* <li>Watch the node for further children changed events</li>
|
||||
* <li>Watch all new children for changed events</li>
|
||||
* <li>Read all children and handle them</li>
|
||||
* </ol>
|
||||
*/
|
||||
@Override
|
||||
public void nodeChildrenChanged(String path) {
|
||||
if(path.equals(watcher.assignmentZNode)) {
|
||||
synchronized(regionsInTransition) {
|
||||
try {
|
||||
List<NodeAndData> newNodes = ZKUtil.watchAndGetNewChildren(watcher,
|
||||
// Just make sure we see the changes for the new znodes
|
||||
ZKUtil.listChildrenAndWatchThem(watcher,
|
||||
watcher.assignmentZNode);
|
||||
for(NodeAndData newNode : newNodes) {
|
||||
LOG.debug("Handling new unassigned node: " + newNode);
|
||||
handleRegion(RegionTransitionData.fromBytes(newNode.getData()));
|
||||
}
|
||||
} catch(KeeperException e) {
|
||||
master.abort("Unexpected ZK exception reading unassigned children", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the region as online. Removes it from regions in transition and
|
||||
|
@ -843,11 +842,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
rs = this.regionsInTransition.get(e.getKey());
|
||||
}
|
||||
if (rs == null) continue;
|
||||
synchronized (rs) {
|
||||
rs.updateTimestampToNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the region as offline. Removes it from regions in transition and
|
||||
|
@ -1311,9 +1308,20 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
synchronized (regionsInTransition) {
|
||||
state = regionsInTransition.get(encodedName);
|
||||
if (state == null) {
|
||||
|
||||
// Create the znode in CLOSING state
|
||||
try {
|
||||
ZKAssign.createNodeClosing(
|
||||
master.getZooKeeper(), region, master.getServerName());
|
||||
} catch (KeeperException e) {
|
||||
master.abort("Unexpected ZK exception creating node CLOSING", e);
|
||||
return;
|
||||
}
|
||||
state = new RegionState(region, RegionState.State.PENDING_CLOSE);
|
||||
regionsInTransition.put(encodedName, state);
|
||||
} else if (force && state.isPendingClose()) {
|
||||
// JD 05/25/11
|
||||
// in my experience this is useless, when this happens it just spins
|
||||
LOG.debug("Attempting to unassign region " +
|
||||
region.getRegionNameAsString() + " which is already pending close "
|
||||
+ "but forcing an additional close");
|
||||
|
@ -1893,9 +1901,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
" has been CLOSED for too long, waiting on queued " +
|
||||
"ClosedRegionHandler to run or server shutdown");
|
||||
// Update our timestamp.
|
||||
synchronized(regionState) {
|
||||
regionState.update(regionState.getState());
|
||||
}
|
||||
regionState.updateTimestampToNow();
|
||||
break;
|
||||
case OFFLINE:
|
||||
LOG.info("Region has been OFFLINE for too long, " +
|
||||
|
@ -2320,10 +2326,13 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
private State state;
|
||||
private long stamp;
|
||||
// Many threads can update the state at the stamp at the same time
|
||||
private final AtomicLong stamp;
|
||||
private ServerName serverName;
|
||||
|
||||
public RegionState() {}
|
||||
public RegionState() {
|
||||
this.stamp = new AtomicLong(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
RegionState(HRegionInfo region, State state) {
|
||||
this(region, state, System.currentTimeMillis(), null);
|
||||
|
@ -2332,24 +2341,28 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
RegionState(HRegionInfo region, State state, long stamp, ServerName serverName) {
|
||||
this.region = region;
|
||||
this.state = state;
|
||||
this.stamp = stamp;
|
||||
this.stamp = new AtomicLong(stamp);
|
||||
this.serverName = serverName;
|
||||
}
|
||||
|
||||
public void update(State state, long stamp, ServerName serverName) {
|
||||
this.state = state;
|
||||
this.stamp = stamp;
|
||||
updateTimestamp(stamp);
|
||||
this.serverName = serverName;
|
||||
}
|
||||
|
||||
public void update(State state) {
|
||||
this.state = state;
|
||||
this.stamp = System.currentTimeMillis();
|
||||
updateTimestampToNow();
|
||||
this.serverName = null;
|
||||
}
|
||||
|
||||
public void updateTimestamp(long stamp) {
|
||||
this.stamp.set(stamp);
|
||||
}
|
||||
|
||||
public void updateTimestampToNow() {
|
||||
this.stamp = System.currentTimeMillis();
|
||||
this.stamp.set(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
|
@ -2357,7 +2370,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
public long getStamp() {
|
||||
return stamp;
|
||||
return stamp.get();
|
||||
}
|
||||
|
||||
public HRegionInfo getRegion() {
|
||||
|
@ -2413,14 +2426,14 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
region = new HRegionInfo();
|
||||
region.readFields(in);
|
||||
state = State.valueOf(in.readUTF());
|
||||
stamp = in.readLong();
|
||||
stamp.set(in.readLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
region.write(out);
|
||||
out.writeUTF(state.name());
|
||||
out.writeLong(stamp);
|
||||
out.writeLong(stamp.get());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -304,14 +304,36 @@ public class SplitTransaction {
|
|||
// Tell master about split by updating zk. If we fail, abort.
|
||||
if (server != null && server.getZooKeeper() != null) {
|
||||
try {
|
||||
transitionNodeSplit(server.getZooKeeper(), parent.getRegionInfo(),
|
||||
a.getRegionInfo(), b.getRegionInfo(), server.getServerName(),
|
||||
this.znodeVersion);
|
||||
this.znodeVersion = transitionNodeSplit(server.getZooKeeper(),
|
||||
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
|
||||
server.getServerName(), this.znodeVersion);
|
||||
|
||||
int spins = 0;
|
||||
// Now wait for the master to process the split. We know it's done
|
||||
// when the znode is deleted. The reason we keep tickling the znode is
|
||||
// that it's possible for the master to miss an event.
|
||||
do {
|
||||
if (spins % 10 == 0) {
|
||||
LOG.info("Still waiting on the master to process the split for " +
|
||||
this.parent.getRegionInfo().getEncodedName());
|
||||
}
|
||||
Thread.sleep(100);
|
||||
// When this returns -1 it means the znode doesn't exist
|
||||
this.znodeVersion = tickleNodeSplit(server.getZooKeeper(),
|
||||
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
|
||||
server.getServerName(), this.znodeVersion);
|
||||
spins++;
|
||||
} while (this.znodeVersion != -1);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
server.abort("Failed telling master about split", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Coprocessor callback
|
||||
if (this.parent.getCoprocessorHost() != null) {
|
||||
this.parent.getCoprocessorHost().postSplit(a,b);
|
||||
|
@ -701,14 +723,11 @@ public class SplitTransaction {
|
|||
RegionTransitionData data =
|
||||
new RegionTransitionData(EventType.RS_ZK_REGION_SPLITTING,
|
||||
region.getRegionName(), serverName);
|
||||
// This synchronization is copied from ZKAssign.
|
||||
synchronized(zkw.getNodes()) {
|
||||
|
||||
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
|
||||
zkw.getNodes().add(node);
|
||||
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, data.getBytes())) {
|
||||
throw new IOException("Failed create of ephemeral " + node);
|
||||
}
|
||||
}
|
||||
// Transition node from SPLITTING to SPLITTING and pick up version so we
|
||||
// can be sure this znode is ours; version is needed deleting.
|
||||
return transitionNodeSplitting(zkw, region, serverName, -1);
|
||||
|
@ -764,4 +783,14 @@ public class SplitTransaction {
|
|||
return ZKAssign.transitionNode(zkw, parent, serverName,
|
||||
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
|
||||
}
|
||||
|
||||
private static int tickleNodeSplit(ZooKeeperWatcher zkw,
|
||||
HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
|
||||
final int znodeVersion)
|
||||
throws KeeperException, IOException {
|
||||
byte [] payload = Writables.getBytes(a, b);
|
||||
return ZKAssign.transitionNode(zkw, parent, serverName,
|
||||
EventType.RS_ZK_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT,
|
||||
znodeVersion, payload);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ public class CloseRegionHandler extends EventHandler {
|
|||
|
||||
int expectedVersion = FAILED;
|
||||
if (this.zk) {
|
||||
expectedVersion = setClosingState();
|
||||
expectedVersion = getCurrentVersion();
|
||||
if (expectedVersion == FAILED) return;
|
||||
}
|
||||
|
||||
|
@ -169,16 +169,16 @@ public class CloseRegionHandler extends EventHandler {
|
|||
}
|
||||
|
||||
/**
|
||||
* Create ZK node in CLOSING state.
|
||||
* @return The expectedVersion. If -1, we failed setting CLOSING.
|
||||
* Get the node's current version
|
||||
* @return The expectedVersion. If -1, we failed getting the node
|
||||
*/
|
||||
private int setClosingState() {
|
||||
private int getCurrentVersion() {
|
||||
int expectedVersion = FAILED;
|
||||
try {
|
||||
if ((expectedVersion = ZKAssign.createNodeClosing(
|
||||
server.getZooKeeper(), regionInfo, server.getServerName())) == FAILED) {
|
||||
LOG.warn("Error creating node in CLOSING state, aborting close of " +
|
||||
regionInfo.getRegionNameAsString());
|
||||
if ((expectedVersion = ZKAssign.getVersion(
|
||||
server.getZooKeeper(), regionInfo)) == FAILED) {
|
||||
LOG.warn("Error getting node's version in CLOSING state," +
|
||||
" aborting close of " + regionInfo.getRegionNameAsString());
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Error creating node in CLOSING state, aborting close of " +
|
||||
|
|
|
@ -95,6 +95,7 @@ public class HBaseFsck {
|
|||
// Empty regioninfo qualifiers in .META.
|
||||
private Set<Result> emptyRegionInfoQualifiers = new HashSet<Result>();
|
||||
private int numThreads = MAX_NUM_THREADS;
|
||||
private final HBaseAdmin admin;
|
||||
|
||||
ThreadPoolExecutor executor; // threads to retrieve data from regionservers
|
||||
|
||||
|
@ -109,7 +110,7 @@ public class HBaseFsck {
|
|||
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
|
||||
this.conf = conf;
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
admin = new HBaseAdmin(conf);
|
||||
status = admin.getMaster().getClusterStatus();
|
||||
connection = admin.getConnection();
|
||||
|
||||
|
@ -466,7 +467,7 @@ public class HBaseFsck {
|
|||
if (shouldFix()) {
|
||||
errors.print("Trying to fix unassigned region...");
|
||||
setShouldRerun();
|
||||
HBaseFsckRepair.fixUnassigned(this.conf, hbi.metaEntry);
|
||||
HBaseFsckRepair.fixUnassigned(this.admin, hbi.metaEntry);
|
||||
}
|
||||
} else if (inMeta && inHdfs && isDeployed && !shouldBeDeployed) {
|
||||
errors.reportError(ERROR_CODE.SHOULD_NOT_BE_DEPLOYED, "Region "
|
||||
|
@ -481,7 +482,7 @@ public class HBaseFsck {
|
|||
if (shouldFix()) {
|
||||
errors.print("Trying to fix assignment error...");
|
||||
setShouldRerun();
|
||||
HBaseFsckRepair.fixDupeAssignment(this.conf, hbi.metaEntry, hbi.deployedOn);
|
||||
HBaseFsckRepair.fixDupeAssignment(this.admin, hbi.metaEntry, hbi.deployedOn);
|
||||
}
|
||||
} else if (inMeta && inHdfs && isDeployed && !deploymentMatchesMeta) {
|
||||
errors.reportError(ERROR_CODE.SERVER_DOES_NOT_MATCH_META, "Region "
|
||||
|
@ -492,7 +493,7 @@ public class HBaseFsck {
|
|||
if (shouldFix()) {
|
||||
errors.print("Trying to fix assignment error...");
|
||||
setShouldRerun();
|
||||
HBaseFsckRepair.fixDupeAssignment(this.conf, hbi.metaEntry, hbi.deployedOn);
|
||||
HBaseFsckRepair.fixDupeAssignment(this.admin, hbi.metaEntry, hbi.deployedOn);
|
||||
}
|
||||
} else {
|
||||
errors.reportError(ERROR_CODE.UNKNOWN, "Region " + descriptiveName +
|
||||
|
@ -735,7 +736,7 @@ public class HBaseFsck {
|
|||
errors.print("Trying to fix a problem with .META...");
|
||||
setShouldRerun();
|
||||
// try to fix it (treat it as unassigned region)
|
||||
HBaseFsckRepair.fixUnassigned(conf, root.metaEntry);
|
||||
HBaseFsckRepair.fixUnassigned(this.admin, root.metaEntry);
|
||||
}
|
||||
}
|
||||
// If there are more than one regions pretending to hold the .META.
|
||||
|
@ -749,7 +750,7 @@ public class HBaseFsck {
|
|||
for (HbckInfo mRegion : metaRegions) {
|
||||
deployedOn.add(mRegion.metaEntry.regionServer);
|
||||
}
|
||||
HBaseFsckRepair.fixDupeAssignment(conf, root.metaEntry, deployedOn);
|
||||
HBaseFsckRepair.fixDupeAssignment(this.admin, root.metaEntry, deployedOn);
|
||||
}
|
||||
}
|
||||
// rerun hbck with hopefully fixed META
|
||||
|
|
|
@ -23,16 +23,14 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
public class HBaseFsckRepair {
|
||||
|
@ -41,14 +39,14 @@ public class HBaseFsckRepair {
|
|||
* Fix dupe assignment by doing silent closes on each RS hosting the region
|
||||
* and then force ZK unassigned node to OFFLINE to trigger assignment by
|
||||
* master.
|
||||
* @param conf
|
||||
* @param admin
|
||||
* @param region
|
||||
* @param servers
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public static void fixDupeAssignment(Configuration conf, HRegionInfo region,
|
||||
public static void fixDupeAssignment(HBaseAdmin admin, HRegionInfo region,
|
||||
List<ServerName> servers)
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
|
||||
|
@ -56,45 +54,33 @@ public class HBaseFsckRepair {
|
|||
|
||||
// Close region on the servers silently
|
||||
for(ServerName server : servers) {
|
||||
closeRegionSilentlyAndWait(conf, server, actualRegion);
|
||||
closeRegionSilentlyAndWait(admin.getConfiguration(), server, actualRegion);
|
||||
}
|
||||
|
||||
// Force ZK node to OFFLINE so master assigns
|
||||
forceOfflineInZK(conf, actualRegion);
|
||||
forceOfflineInZK(admin, actualRegion);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fix unassigned by creating/transition the unassigned ZK node for this
|
||||
* region to OFFLINE state with a special flag to tell the master that this
|
||||
* is a forced operation by HBCK.
|
||||
* @param conf
|
||||
* @param admin
|
||||
* @param region
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static void fixUnassigned(Configuration conf, HRegionInfo region)
|
||||
public static void fixUnassigned(HBaseAdmin admin, HRegionInfo region)
|
||||
throws IOException, KeeperException {
|
||||
HRegionInfo actualRegion = new HRegionInfo(region);
|
||||
|
||||
// Force ZK node to OFFLINE so master assigns
|
||||
forceOfflineInZK(conf, actualRegion);
|
||||
forceOfflineInZK(admin, actualRegion);
|
||||
}
|
||||
|
||||
private static void forceOfflineInZK(Configuration conf, final HRegionInfo region)
|
||||
private static void forceOfflineInZK(HBaseAdmin admin, final HRegionInfo region)
|
||||
throws ZooKeeperConnectionException, KeeperException, IOException {
|
||||
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
||||
@Override
|
||||
public Void connect(HConnection connection) throws IOException {
|
||||
try {
|
||||
ZKAssign.createOrForceNodeOffline(
|
||||
connection.getZooKeeperWatcher(),
|
||||
region, HConstants.HBCK_CODE_SERVERNAME);
|
||||
} catch (KeeperException ke) {
|
||||
throw new IOException(ke);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
admin.assign(region.getRegionName(), true);
|
||||
}
|
||||
|
||||
private static void closeRegionSilentlyAndWait(Configuration conf,
|
||||
|
|
|
@ -66,16 +66,16 @@ public class RegionServerTracker extends ZooKeeperListener {
|
|||
*/
|
||||
public void start() throws KeeperException, IOException {
|
||||
watcher.registerListener(this);
|
||||
List<NodeAndData> servers =
|
||||
ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
|
||||
List<String> servers =
|
||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
|
||||
add(servers);
|
||||
}
|
||||
|
||||
private void add(final List<NodeAndData> servers) throws IOException {
|
||||
private void add(final List<String> servers) throws IOException {
|
||||
synchronized(this.regionServers) {
|
||||
this.regionServers.clear();
|
||||
for (NodeAndData n: servers) {
|
||||
ServerName sn = new ServerName(ZKUtil.getNodeName(n.getNode()));
|
||||
for (String n: servers) {
|
||||
ServerName sn = new ServerName(ZKUtil.getNodeName(n));
|
||||
this.regionServers.add(sn);
|
||||
}
|
||||
}
|
||||
|
@ -107,8 +107,8 @@ public class RegionServerTracker extends ZooKeeperListener {
|
|||
public void nodeChildrenChanged(String path) {
|
||||
if (path.equals(watcher.rsZNode)) {
|
||||
try {
|
||||
List<NodeAndData> servers =
|
||||
ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
|
||||
List<String> servers =
|
||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
|
||||
add(servers);
|
||||
} catch (IOException e) {
|
||||
abortable.abort("Unexpected zk exception getting RS nodes", e);
|
||||
|
|
|
@ -143,12 +143,9 @@ public class ZKAssign {
|
|||
region.getEncodedName() + " in OFFLINE state"));
|
||||
RegionTransitionData data = new RegionTransitionData(event,
|
||||
region.getRegionName(), serverName);
|
||||
synchronized(zkw.getNodes()) {
|
||||
String node = getNodeName(zkw, region.getEncodedName());
|
||||
zkw.getNodes().add(node);
|
||||
ZKUtil.createAndWatch(zkw, node, data.getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an unassigned node in the OFFLINE state for the specified region.
|
||||
|
@ -173,12 +170,9 @@ public class ZKAssign {
|
|||
region.getEncodedName() + " with OFFLINE state"));
|
||||
RegionTransitionData data = new RegionTransitionData(
|
||||
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
||||
synchronized(zkw.getNodes()) {
|
||||
String node = getNodeName(zkw, region.getEncodedName());
|
||||
zkw.getNodes().add(node);
|
||||
ZKUtil.asyncCreate(zkw, node, data.getBytes(), cb, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Forces an existing unassigned node to the OFFLINE state for the specified
|
||||
|
@ -205,12 +199,9 @@ public class ZKAssign {
|
|||
region.getEncodedName() + " to OFFLINE state"));
|
||||
RegionTransitionData data = new RegionTransitionData(
|
||||
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
||||
synchronized(zkw.getNodes()) {
|
||||
String node = getNodeName(zkw, region.getEncodedName());
|
||||
zkw.getNodes().add(node);
|
||||
ZKUtil.setData(zkw, node, data.getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
|
@ -238,10 +229,8 @@ public class ZKAssign {
|
|||
region.getEncodedName() + " with OFFLINE state"));
|
||||
RegionTransitionData data = new RegionTransitionData(
|
||||
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
||||
synchronized(zkw.getNodes()) {
|
||||
String node = getNodeName(zkw, region.getEncodedName());
|
||||
zkw.sync(node);
|
||||
zkw.getNodes().add(node);
|
||||
int version = ZKUtil.checkExists(zkw, node);
|
||||
if (version == -1) {
|
||||
ZKUtil.createAndWatch(zkw, node, data.getBytes());
|
||||
|
@ -259,7 +248,6 @@ public class ZKAssign {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -408,9 +396,6 @@ public class ZKAssign {
|
|||
" state but node is in " + data.getEventType() + " state"));
|
||||
return false;
|
||||
}
|
||||
synchronized(zkw.getNodes()) {
|
||||
// TODO: Does this go here or only if we successfully delete node?
|
||||
zkw.getNodes().remove(node);
|
||||
if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) {
|
||||
LOG.warn(zkw.prefix("Attempting to delete " +
|
||||
"unassigned node in " + expectedState +
|
||||
|
@ -421,7 +406,6 @@ public class ZKAssign {
|
|||
regionName + " in expected state " + expectedState));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all unassigned nodes regardless of their state.
|
||||
|
@ -473,12 +457,9 @@ public class ZKAssign {
|
|||
RegionTransitionData data = new RegionTransitionData(
|
||||
EventType.RS_ZK_REGION_CLOSING, region.getRegionName(), serverName);
|
||||
|
||||
synchronized (zkw.getNodes()) {
|
||||
String node = getNodeName(zkw, region.getEncodedName());
|
||||
zkw.getNodes().add(node);
|
||||
return ZKUtil.createAndWatch(zkw, node, data.getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions an existing unassigned node for the specified region which is
|
||||
|
@ -782,6 +763,19 @@ public class ZKAssign {
|
|||
return RegionTransitionData.fromBytes(data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the version of the specified znode
|
||||
* @param zkw zk reference
|
||||
* @param region region's info
|
||||
* @return the version of the znode, -1 if it doesn't exist
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static int getVersion(ZooKeeperWatcher zkw, HRegionInfo region)
|
||||
throws KeeperException {
|
||||
String znode = getNodeName(zkw, region.getEncodedName());
|
||||
return ZKUtil.checkExists(zkw, znode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the assignment node regardless of its current state.
|
||||
* <p>
|
||||
|
|
|
@ -348,30 +348,6 @@ public class ZKUtil {
|
|||
return children;
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically add watches and read data from all unwatched unassigned nodes.
|
||||
*
|
||||
* <p>This works because master is the only person deleting nodes.
|
||||
*/
|
||||
public static List<NodeAndData> watchAndGetNewChildren(ZooKeeperWatcher zkw,
|
||||
String baseNode)
|
||||
throws KeeperException {
|
||||
List<NodeAndData> newNodes = new ArrayList<NodeAndData>();
|
||||
synchronized(zkw.getNodes()) {
|
||||
List<String> nodes =
|
||||
ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode);
|
||||
for(String node : nodes) {
|
||||
String nodePath = ZKUtil.joinZNode(baseNode, node);
|
||||
if(!zkw.getNodes().contains(nodePath)) {
|
||||
byte [] data = ZKUtil.getDataAndWatch(zkw, nodePath);
|
||||
newNodes.add(new NodeAndData(nodePath, data));
|
||||
zkw.getNodes().add(nodePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
return newNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple class to hold a node path and node data.
|
||||
*/
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -68,9 +67,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
|||
private final List<ZooKeeperListener> listeners =
|
||||
new CopyOnWriteArrayList<ZooKeeperListener>();
|
||||
|
||||
// set of unassigned nodes watched
|
||||
private Set<String> unassignedNodes = new HashSet<String>();
|
||||
|
||||
// node names
|
||||
|
||||
// base znode for this cluster
|
||||
|
@ -372,14 +368,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
|||
this.zooKeeper.sync(path, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the set of already watched unassigned nodes.
|
||||
* @return Set of Nodes.
|
||||
*/
|
||||
public Set<String> getNodes() {
|
||||
return unassignedNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles KeeperExceptions in client calls.
|
||||
* <p>
|
||||
|
|
|
@ -97,13 +97,6 @@ public class MiniHBaseCluster {
|
|||
this.user = User.getCurrent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean closeRegion(HRegionInfo region)
|
||||
throws IOException {
|
||||
if (TEST_SKIP_CLOSE) return true;
|
||||
return super.closeRegion(region);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param c
|
||||
* @param currentfs We return this if we did not make a new one.
|
||||
|
|
|
@ -88,69 +88,6 @@ public class TestSplitTransactionOnCluster {
|
|||
return regions.get(0).getRegionInfo();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test what happens if master goes to balance a region just as regionserver
|
||||
* goes to split it. The PENDING_CLOSE state is the strange one since its
|
||||
* in the Master's head only, not out in zk. Test this case.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws NodeExistsException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
@Test (timeout = 600000) public void testPendingCloseAndSplit()
|
||||
throws IOException, InterruptedException, NodeExistsException, KeeperException {
|
||||
final byte [] tableName = Bytes.toBytes("pendingCloseAndSplit");
|
||||
|
||||
// Create table then get the single region for our new table.
|
||||
HTable t = TESTING_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||
|
||||
List<HRegion> regions = cluster.getRegions(tableName);
|
||||
HRegionInfo hri = getAndCheckSingleTableRegion(regions);
|
||||
|
||||
int tableRegionIndex = ensureTableRegionNotOnSameServerAsMeta(admin, hri);
|
||||
|
||||
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||
this.admin.balanceSwitch(false);
|
||||
// Turn off the meta scanner so it don't remove parent on us.
|
||||
this.cluster.getMaster().setCatalogJanitorEnabled(false);
|
||||
try {
|
||||
// Add a bit of load up into the table so splittable.
|
||||
TESTING_UTIL.loadTable(t, HConstants.CATALOG_FAMILY);
|
||||
// Get region pre-split.
|
||||
HRegionServer server = cluster.getRegionServer(tableRegionIndex);
|
||||
printOutRegions(server, "Initial regions: ");
|
||||
int regionCount = server.getOnlineRegions().size();
|
||||
// Now send in a close of a region but first make the close on the regionserver
|
||||
// a NOOP. This way the master has all the state of it going to close
|
||||
// but then a SPLITTING arrives. This is what we want to test.
|
||||
// Here is how we turn CLOSE into NOOP in test.
|
||||
MiniHBaseCluster.MiniHBaseClusterRegionServer.TEST_SKIP_CLOSE = true;
|
||||
this.cluster.getMaster().unassign(hri.getRegionName(), false);
|
||||
// Now try splitting and it should work.
|
||||
LOG.info("Running split on server " + server.toString());
|
||||
split(hri, server, regionCount);
|
||||
// Get daughters
|
||||
List<HRegion> daughters = this.cluster.getRegions(tableName);
|
||||
assertTrue(daughters.size() >= 2);
|
||||
// Assert the ephemeral node is gone in zk.
|
||||
String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
|
||||
hri.getEncodedName());
|
||||
Stat stat = null;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
stat = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
|
||||
LOG.info("Stat for znode path=" + path + ": " + stat);
|
||||
if (stat == null) break;
|
||||
org.apache.hadoop.hbase.util.Threads.sleep(100);
|
||||
}
|
||||
assertTrue(stat == null);
|
||||
} finally {
|
||||
// Set this flag back.
|
||||
MiniHBaseCluster.MiniHBaseClusterRegionServer.TEST_SKIP_CLOSE = false;
|
||||
admin.balanceSwitch(true);
|
||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A test that intentionally has master fail the processing of the split message.
|
||||
* Tests that the regionserver split ephemeral node gets cleaned up if it
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -37,7 +38,9 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -168,7 +171,14 @@ public class TestHBaseFsck {
|
|||
Thread.sleep(1 * 1000);
|
||||
ArrayList servers = new ArrayList();
|
||||
servers.add(rsAddressOrig);
|
||||
HBaseFsckRepair.fixDupeAssignment(conf, hriOrig, servers);
|
||||
try {
|
||||
HBaseFsckRepair.fixDupeAssignment(TEST_UTIL.getHBaseAdmin(), hriOrig, servers);
|
||||
} catch (IOException ex) {
|
||||
ex = RemoteExceptionHandler.checkIOException(ex);
|
||||
if (!(ex instanceof UnknownRegionException)) {
|
||||
fail("Unexpected exception: " + ex);
|
||||
}
|
||||
}
|
||||
|
||||
// We created 1 table, should be fine
|
||||
assertNoErrors(doFsck(false));
|
||||
|
|
Loading…
Reference in New Issue