HBASE-6109 Improve RIT performances during assignment on large clusters (N Keywal)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1344802 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
15b1bea883
commit
ff2bcd4760
|
@ -32,7 +32,7 @@ AssignmentManager assignmentManager;
|
|||
int limit = 100;
|
||||
</%args>
|
||||
<%java>
|
||||
Map<String, RegionState> rit = assignmentManager.getRegionsInTransition();
|
||||
Map<String, RegionState> rit = assignmentManager.copyRegionsInTransition();
|
||||
// process the map to find region in transition details
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
int ritThreshold = conf.getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
|
||||
|
|
|
@ -37,11 +37,12 @@ import java.util.Set;
|
|||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -77,6 +78,7 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
|||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
@ -102,7 +104,6 @@ import org.apache.zookeeper.data.Stat;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AssignmentManager extends ZooKeeperListener {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
|
||||
|
||||
public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
|
||||
|
@ -120,6 +121,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
private LoadBalancer balancer;
|
||||
|
||||
final private KeyLocker<String> locker = new KeyLocker<String>();
|
||||
|
||||
/**
|
||||
* Map of regions to reopen after the schema of a table is changed. Key -
|
||||
* encoded region name, value - HRegionInfo
|
||||
|
@ -135,8 +138,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* Regions currently in transition. Map of encoded region names to the master
|
||||
* in-memory state for that region.
|
||||
*/
|
||||
final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
|
||||
new ConcurrentSkipListMap<String, RegionState>();
|
||||
final NotifiableConcurrentSkipListMap<String, RegionState> regionsInTransition =
|
||||
new NotifiableConcurrentSkipListMap<String, RegionState>();
|
||||
|
||||
/** Plans for region movement. Key is the encoded version of a region name*/
|
||||
// TODO: When do plans get cleaned out? Ever? In server open and in server
|
||||
|
@ -148,9 +151,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
private final ZKTable zkTable;
|
||||
|
||||
// store all the table names in disabling state
|
||||
Set<String> disablingTables = new HashSet<String>(1);
|
||||
Set<String> disablingTables = new HashSet<String>();
|
||||
// store all the enabling state tablenames.
|
||||
Set<String> enablingTables = new HashSet<String>(1);
|
||||
Set<String> enablingTables = new HashSet<String>();
|
||||
|
||||
/**
|
||||
* Server to regions assignment map.
|
||||
|
@ -332,6 +335,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
Integer pending = 0;
|
||||
for (HRegionInfo hri : hris) {
|
||||
String name = hri.getEncodedName();
|
||||
// no lock concurrent access ok: sequential consistency respected.
|
||||
if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) {
|
||||
pending++;
|
||||
}
|
||||
|
@ -438,9 +442,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
// Remove regions in RIT, they are possibly being processed by
|
||||
// ServerShutdownHandler.
|
||||
synchronized (regionsInTransition) {
|
||||
nodes.removeAll(regionsInTransition.keySet());
|
||||
}
|
||||
// no lock concurrent access ok: some threads may be adding/removing items but its java-valid
|
||||
nodes.removeAll(regionsInTransition.keySet());
|
||||
|
||||
// If we found user regions out on cluster, its a failover.
|
||||
if (this.failover) {
|
||||
|
@ -475,12 +478,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
processRegionInTransition(hri.getEncodedName(), hri, null);
|
||||
if (!intransistion) return intransistion;
|
||||
LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
|
||||
synchronized(this.regionsInTransition) {
|
||||
while (!this.master.isStopped() &&
|
||||
this.regionsInTransition.containsKey(hri.getEncodedName())) {
|
||||
// We expect a notify, but by security we set a timout
|
||||
this.regionsInTransition.wait(100);
|
||||
}
|
||||
while (!this.master.isStopped() &&
|
||||
// no lock concurrent access ok: sequentially consistent
|
||||
this.regionsInTransition.containsKey(hri.getEncodedName())) {
|
||||
// We put a timeout because we may have the region getting in just between the test
|
||||
// and the waitForUpdate
|
||||
this.regionsInTransition.waitForUpdate(100);
|
||||
}
|
||||
return intransistion;
|
||||
}
|
||||
|
@ -524,10 +527,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
ServerName sn = rt.getServerName();
|
||||
String encodedRegionName = regionInfo.getEncodedName();
|
||||
LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
|
||||
synchronized (regionsInTransition) {
|
||||
|
||||
// We need a lock here to ensure that we will not put the same region twice
|
||||
// It has no reason to be a lock shared with the other operations.
|
||||
// We can do the lock on the region only, instead of a global lock: what we want to ensure
|
||||
// is that we don't have two threads working on the same region.
|
||||
Lock lock = locker.acquireLock(encodedRegionName);
|
||||
try {
|
||||
RegionState regionState = regionsInTransition.get(encodedRegionName);
|
||||
if (regionState != null ||
|
||||
failoverProcessedRegions.containsKey(encodedRegionName)) {
|
||||
if (regionState != null || failoverProcessedRegions.containsKey(encodedRegionName)) {
|
||||
// Just return
|
||||
return;
|
||||
}
|
||||
|
@ -575,7 +583,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
case RS_ZK_REGION_OPENING:
|
||||
// TODO: Could check if it was on deadServers. If it was, then we could
|
||||
// do what happens in TimeoutMonitor when it sees this condition.
|
||||
|
||||
// Just insert region into RIT
|
||||
// If this never updates the timeout will trigger new assignment
|
||||
if (regionInfo.isMetaTable()) {
|
||||
|
@ -628,11 +635,17 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
default:
|
||||
throw new IllegalStateException("Received region in state :" + et + " is not valid");
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Put the region <code>hri</code> into an offline state up in zk.
|
||||
*
|
||||
* You need to have lock on the region before calling this method.
|
||||
*
|
||||
* @param hri
|
||||
* @param oldRt
|
||||
* @throws KeeperException
|
||||
|
@ -656,7 +669,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param oldData
|
||||
*/
|
||||
private void addToRITandCallClose(final HRegionInfo hri,
|
||||
final RegionState.State state, final RegionTransition oldData) {
|
||||
final RegionState.State state, final RegionTransition oldData) {
|
||||
// No lock concurrency: adding a synchronized here would not prevent to have two
|
||||
// entries as we don't check if the region is already there. This must be ensured by the
|
||||
// method callers.
|
||||
this.regionsInTransition.put(hri.getEncodedName(), getRegionState(hri, state, oldData));
|
||||
new ClosedRegionHandler(this.master, this, hri).process();
|
||||
}
|
||||
|
@ -677,10 +693,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param hri HRegionInfo of the region which was closed
|
||||
*/
|
||||
public void removeClosedRegion(HRegionInfo hri) {
|
||||
if (!regionsToReopen.isEmpty()) {
|
||||
if (regionsToReopen.remove(hri.getEncodedName()) != null) {
|
||||
LOG.debug("Removed region from reopening regions because it was closed");
|
||||
}
|
||||
if (regionsToReopen.remove(hri.getEncodedName()) != null) {
|
||||
LOG.debug("Removed region from reopening regions because it was closed");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -713,41 +727,45 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param expectedVersion
|
||||
*/
|
||||
private void handleRegion(final RegionTransition rt, int expectedVersion) {
|
||||
synchronized(regionsInTransition) {
|
||||
HRegionInfo hri = null;
|
||||
if (rt == null) {
|
||||
LOG.warn("Unexpected NULL input " + rt);
|
||||
return;
|
||||
}
|
||||
final ServerName sn = rt.getServerName();
|
||||
if (sn == null) {
|
||||
LOG.warn("Null servername: " + rt);
|
||||
return;
|
||||
}
|
||||
// Check if this is a special HBCK transition
|
||||
if (sn.equals(HBCK_CODE_SERVERNAME)) {
|
||||
handleHBCK(rt);
|
||||
return;
|
||||
}
|
||||
final long createTime = rt.getCreateTime();
|
||||
final byte [] regionName = rt.getRegionName();
|
||||
String encodedName = HRegionInfo.encodeRegionName(regionName);
|
||||
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
|
||||
// Verify this is a known server
|
||||
if (!serverManager.isServerOnline(sn) &&
|
||||
!this.master.getServerName().equals(sn)
|
||||
&& !ignoreStatesRSOffline.contains(rt.getEventType())) {
|
||||
LOG.warn("Attempted to handle region transition for server but " +
|
||||
"server is not online: " + prettyPrintedRegionName);
|
||||
return;
|
||||
}
|
||||
HRegionInfo hri = null;
|
||||
if (rt == null) {
|
||||
LOG.warn("Unexpected NULL input " + rt);
|
||||
return;
|
||||
}
|
||||
final ServerName sn = rt.getServerName();
|
||||
if (sn == null) {
|
||||
LOG.warn("Null servername: " + rt);
|
||||
return;
|
||||
}
|
||||
// Check if this is a special HBCK transition
|
||||
if (sn.equals(HBCK_CODE_SERVERNAME)) {
|
||||
handleHBCK(rt);
|
||||
return;
|
||||
}
|
||||
final long createTime = rt.getCreateTime();
|
||||
final byte[] regionName = rt.getRegionName();
|
||||
String encodedName = HRegionInfo.encodeRegionName(regionName);
|
||||
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
|
||||
// Verify this is a known server
|
||||
if (!serverManager.isServerOnline(sn) &&
|
||||
!this.master.getServerName().equals(sn)
|
||||
&& !ignoreStatesRSOffline.contains(rt.getEventType())) {
|
||||
LOG.warn("Attempted to handle region transition for server but " +
|
||||
"server is not online: " + prettyPrintedRegionName);
|
||||
return;
|
||||
}
|
||||
|
||||
// We need a lock on the region as we could update it
|
||||
Lock lock = locker.acquireLock(encodedName);
|
||||
try {
|
||||
// Printing if the event was created a long time ago helps debugging
|
||||
boolean lateEvent = createTime < (System.currentTimeMillis() - 15000);
|
||||
RegionState regionState = regionsInTransition.get(encodedName);
|
||||
LOG.debug("Handling transition=" + rt.getEventType() +
|
||||
", server=" + sn + ", region=" +
|
||||
(prettyPrintedRegionName == null? "null": prettyPrintedRegionName) +
|
||||
(lateEvent? ", which is more than 15 seconds late" : ""));
|
||||
RegionState regionState = regionsInTransition.get(encodedName);
|
||||
(prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
|
||||
(lateEvent ? ", which is more than 15 seconds late" : "") +
|
||||
", current state from RIT=" + regionState);
|
||||
switch (rt.getEventType()) {
|
||||
case M_ZK_REGION_OFFLINE:
|
||||
// Nothing to do.
|
||||
|
@ -924,6 +942,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
default:
|
||||
throw new IllegalStateException("Received event is not valid.");
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1091,6 +1111,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ZooKeeper events
|
||||
|
@ -1148,7 +1169,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
public void nodeDeleted(final String path) {
|
||||
if (path.startsWith(this.watcher.assignmentZNode)) {
|
||||
String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path);
|
||||
RegionState rs = this.regionsInTransition.get(regionName);
|
||||
// no lock concurrency ok: sequentially consistent if someone adds/removes the region in
|
||||
// the same time
|
||||
RegionState rs = this.regionsInTransition.get(regionName);
|
||||
if (rs != null) {
|
||||
HRegionInfo regionInfo = rs.getRegion();
|
||||
if (rs.isSplit()) {
|
||||
|
@ -1164,6 +1187,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void makeRegionOnline(RegionState rs, HRegionInfo regionInfo) {
|
||||
|
@ -1214,13 +1238,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param sn
|
||||
*/
|
||||
void regionOnline(HRegionInfo regionInfo, ServerName sn) {
|
||||
synchronized (this.regionsInTransition) {
|
||||
RegionState rs =
|
||||
this.regionsInTransition.remove(regionInfo.getEncodedName());
|
||||
if (rs != null) {
|
||||
this.regionsInTransition.notifyAll();
|
||||
}
|
||||
}
|
||||
// no lock concurrency ok.
|
||||
this.regionsInTransition.remove(regionInfo.getEncodedName());
|
||||
|
||||
synchronized (this.regions) {
|
||||
// Add check
|
||||
ServerName oldSn = this.regions.get(regionInfo);
|
||||
|
@ -1265,23 +1285,26 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param sn
|
||||
*/
|
||||
private void updateTimers(final ServerName sn) {
|
||||
if (sn == null) return;
|
||||
|
||||
// This loop could be expensive.
|
||||
// First make a copy of current regionPlan rather than hold sync while
|
||||
// looping because holding sync can cause deadlock. Its ok in this loop
|
||||
// if the Map we're going against is a little stale
|
||||
Map<String, RegionPlan> copy = new HashMap<String, RegionPlan>();
|
||||
List<Map.Entry<String, RegionPlan>> rps;
|
||||
synchronized(this.regionPlans) {
|
||||
copy.putAll(this.regionPlans);
|
||||
rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
|
||||
}
|
||||
for (Map.Entry<String, RegionPlan> e: copy.entrySet()) {
|
||||
if (e.getValue() == null || e.getValue().getDestination() == null) continue;
|
||||
if (!e.getValue().getDestination().equals(sn)) continue;
|
||||
RegionState rs = null;
|
||||
synchronized (this.regionsInTransition) {
|
||||
rs = this.regionsInTransition.get(e.getKey());
|
||||
|
||||
for (Map.Entry<String, RegionPlan> e : rps) {
|
||||
if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
|
||||
RegionState rs = this.regionsInTransition.get(e.getKey());
|
||||
if (rs != null) {
|
||||
// no lock concurrency ok: there is a write when we update the timestamp but it's ok
|
||||
// as it's an AtomicLong
|
||||
rs.updateTimestampToNow();
|
||||
}
|
||||
}
|
||||
if (rs == null) continue;
|
||||
rs.updateTimestampToNow();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1293,11 +1316,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param regionInfo
|
||||
*/
|
||||
public void regionOffline(final HRegionInfo regionInfo) {
|
||||
synchronized(this.regionsInTransition) {
|
||||
if (this.regionsInTransition.remove(regionInfo.getEncodedName()) != null) {
|
||||
this.regionsInTransition.notifyAll();
|
||||
}
|
||||
}
|
||||
// no lock concurrency ok
|
||||
this.regionsInTransition.remove(regionInfo.getEncodedName());
|
||||
|
||||
// remove the region plan as well just in case.
|
||||
clearRegionPlan(regionInfo);
|
||||
setOffline(regionInfo);
|
||||
|
@ -1409,14 +1430,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
destination.toString());
|
||||
|
||||
List<RegionState> states = new ArrayList<RegionState>(regions.size());
|
||||
synchronized (this.regionsInTransition) {
|
||||
for (HRegionInfo region: regions) {
|
||||
states.add(forceRegionStateToOffline(region));
|
||||
}
|
||||
for (HRegionInfo region : regions) {
|
||||
states.add(forceRegionStateToOffline(region));
|
||||
}
|
||||
// Add region plans, so we can updateTimers when one region is opened so
|
||||
// that unnecessary timeout on RIT is reduced.
|
||||
Map<String, RegionPlan> plans=new HashMap<String, RegionPlan>();
|
||||
Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
|
||||
for (HRegionInfo region : regions) {
|
||||
plans.put(region.getEncodedName(), new RegionPlan(region, null,
|
||||
destination));
|
||||
|
@ -1610,13 +1629,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
*/
|
||||
private RegionState addToRegionsInTransition(final HRegionInfo region,
|
||||
boolean hijack) {
|
||||
synchronized (regionsInTransition) {
|
||||
return forceRegionStateToOffline(region, hijack);
|
||||
}
|
||||
return forceRegionStateToOffline(region, hijack);
|
||||
}
|
||||
/**
|
||||
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
|
||||
* Caller must hold lock on this.regionsInTransition.
|
||||
* @param region
|
||||
* @return Amended RegionState.
|
||||
*/
|
||||
|
@ -1626,7 +1642,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
/**
|
||||
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
|
||||
* Caller must hold lock on this.regionsInTransition.
|
||||
* @param region
|
||||
* @param hijack
|
||||
* @return Amended RegionState.
|
||||
|
@ -1634,23 +1649,29 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
private RegionState forceRegionStateToOffline(final HRegionInfo region,
|
||||
boolean hijack) {
|
||||
String encodedName = region.getEncodedName();
|
||||
RegionState state = this.regionsInTransition.get(encodedName);
|
||||
if (state == null) {
|
||||
state = new RegionState(region, RegionState.State.OFFLINE);
|
||||
this.regionsInTransition.put(encodedName, state);
|
||||
} else {
|
||||
// If we are reassigning the node do not force in-memory state to OFFLINE.
|
||||
// Based on the znode state we will decide if to change in-memory state to
|
||||
// OFFLINE or not. It will be done before setting znode to OFFLINE state.
|
||||
|
||||
// We often get here with state == CLOSED because ClosedRegionHandler will
|
||||
// assign on its tail as part of the handling of a region close.
|
||||
if (!hijack) {
|
||||
LOG.debug("Forcing OFFLINE; was=" + state);
|
||||
state.update(RegionState.State.OFFLINE);
|
||||
Lock lock = locker.acquireLock(encodedName);
|
||||
try {
|
||||
RegionState state = this.regionsInTransition.get(encodedName);
|
||||
if (state == null) {
|
||||
state = new RegionState(region, RegionState.State.OFFLINE);
|
||||
this.regionsInTransition.put(encodedName, state);
|
||||
} else {
|
||||
// If we are reassigning the node do not force in-memory state to OFFLINE.
|
||||
// Based on the znode state we will decide if to change in-memory state to
|
||||
// OFFLINE or not. It will be done before setting znode to OFFLINE state.
|
||||
|
||||
// We often get here with state == CLOSED because ClosedRegionHandler will
|
||||
// assign on its tail as part of the handling of a region close.
|
||||
if (!hijack) {
|
||||
LOG.debug("Forcing OFFLINE; was=" + state);
|
||||
state.update(RegionState.State.OFFLINE);
|
||||
}
|
||||
}
|
||||
return state;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1718,10 +1739,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
"Error deleting OFFLINED node in ZK for transition ZK node ("
|
||||
+ encodedRegionName + ")", e);
|
||||
}
|
||||
synchronized (this.regionsInTransition) {
|
||||
this.regionsInTransition.remove(plan.getRegionInfo()
|
||||
.getEncodedName());
|
||||
}
|
||||
// no lock concurrent ok -> sequentially consistent
|
||||
this.regionsInTransition.remove(plan.getRegionInfo().getEncodedName());
|
||||
|
||||
synchronized (this.regions) {
|
||||
this.regions.put(plan.getRegionInfo(), plan.getDestination());
|
||||
addToServers(plan.getDestination(), plan.getRegionInfo());
|
||||
|
@ -1979,6 +1999,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
unassign(region, false);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Unassigns the specified region.
|
||||
* <p>
|
||||
|
@ -2011,10 +2032,13 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// Grab the state of this region and synchronize on it
|
||||
RegionState state;
|
||||
int versionOfClosingNode = -1;
|
||||
synchronized (regionsInTransition) {
|
||||
// We need a lock here as we're going to do a put later and we don't want multiple states
|
||||
// creation
|
||||
ReentrantLock lock = locker.acquireLock(encodedName);
|
||||
try {
|
||||
state = regionsInTransition.get(encodedName);
|
||||
if (state == null) {
|
||||
// Create the znode in CLOSING state
|
||||
// Create the znode in CLOSING state
|
||||
try {
|
||||
versionOfClosingNode = ZKAssign.createNodeClosing(
|
||||
master.getZooKeeper(), region, master.getServerName());
|
||||
|
@ -2067,7 +2091,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
"already in transition (" + state.getState() + ", force=" + force + ")");
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
// Send CLOSE RPC
|
||||
ServerName server = null;
|
||||
synchronized (this.regions) {
|
||||
|
@ -2076,21 +2103,25 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// ClosedRegionhandler can remove the server from this.regions
|
||||
if (server == null) {
|
||||
// Possibility of disable flow removing from RIT.
|
||||
synchronized (regionsInTransition) {
|
||||
lock = locker.acquireLock(encodedName);
|
||||
try {
|
||||
state = regionsInTransition.get(encodedName);
|
||||
if (state != null) {
|
||||
// remove only if the state is PENDING_CLOSE or CLOSING
|
||||
State presentState = state.getState();
|
||||
if (presentState == State.PENDING_CLOSE
|
||||
|| presentState == State.CLOSING) {
|
||||
RegionState.State presentState = state.getState();
|
||||
if (presentState == RegionState.State.PENDING_CLOSE
|
||||
|| presentState == RegionState.State.CLOSING) {
|
||||
this.regionsInTransition.remove(encodedName);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
// delete the node. if no node exists need not bother.
|
||||
deleteClosingOrClosedNode(region);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// TODO: We should consider making this look more like it does for the
|
||||
// region open where we catch all throwables and never abort
|
||||
|
@ -2117,9 +2148,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
+ region.getTableNameAsString()
|
||||
+ " to DISABLED state the region " + region
|
||||
+ " was offlined but the table was in DISABLING state");
|
||||
synchronized (this.regionsInTransition) {
|
||||
this.regionsInTransition.remove(region.getEncodedName());
|
||||
}
|
||||
|
||||
// Remove from the regionsMap
|
||||
synchronized (this.regions) {
|
||||
this.regions.remove(region);
|
||||
|
@ -2388,6 +2418,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param timeout How long to wait.
|
||||
* @return true if done.
|
||||
*/
|
||||
@Override
|
||||
protected boolean waitUntilDone(final long timeout)
|
||||
throws InterruptedException {
|
||||
Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
|
||||
|
@ -2467,50 +2503,43 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// that if it returns without an exception that there was a period of time
|
||||
// with no regions in transition from the point-of-view of the in-memory
|
||||
// state of the Master.
|
||||
long startTime = System.currentTimeMillis();
|
||||
long remaining = timeout;
|
||||
synchronized (regionsInTransition) {
|
||||
while (regionsInTransition.size() > 0 && !this.master.isStopped()
|
||||
&& remaining > 0) {
|
||||
regionsInTransition.wait(remaining);
|
||||
remaining = timeout - (System.currentTimeMillis() - startTime);
|
||||
}
|
||||
final long endTime = System.currentTimeMillis() + timeout;
|
||||
|
||||
while (!this.master.isStopped() && !regionsInTransition.isEmpty() &&
|
||||
endTime > System.currentTimeMillis()) {
|
||||
regionsInTransition.waitForUpdate(100);
|
||||
}
|
||||
|
||||
return regionsInTransition.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until no regions from set regions are in transition.
|
||||
* @param timeout How long to wait.
|
||||
* @param regions set of regions to wait for
|
||||
* @return True if nothing in regions in transition.
|
||||
* @param regions set of regions to wait for. It will be modified by this method.
|
||||
* @return True if none of the regions in the set is in transition
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions)
|
||||
throws InterruptedException {
|
||||
// Blocks until there are no regions in transition.
|
||||
long startTime = System.currentTimeMillis();
|
||||
long remaining = timeout;
|
||||
boolean stillInTransition = true;
|
||||
synchronized (regionsInTransition) {
|
||||
while (regionsInTransition.size() > 0 && !this.master.isStopped() &&
|
||||
remaining > 0 && stillInTransition) {
|
||||
int count = 0;
|
||||
for (RegionState rs : regionsInTransition.values()) {
|
||||
if (regions.contains(rs.getRegion())) {
|
||||
count++;
|
||||
break;
|
||||
}
|
||||
throws InterruptedException {
|
||||
final long endTime = System.currentTimeMillis() + timeout;
|
||||
|
||||
// We're not synchronizing on regionsInTransition now because we don't use any iterator.
|
||||
while (!regions.isEmpty() && !this.master.isStopped() && endTime > System.currentTimeMillis()) {
|
||||
Iterator<HRegionInfo> regionInfoIterator = regions.iterator();
|
||||
while (regionInfoIterator.hasNext()) {
|
||||
HRegionInfo hri = regionInfoIterator.next();
|
||||
if (!regionsInTransition.containsKey(hri.getEncodedName())) {
|
||||
regionInfoIterator.remove();
|
||||
}
|
||||
if (count == 0) {
|
||||
stillInTransition = false;
|
||||
break;
|
||||
}
|
||||
regionsInTransition.wait(remaining);
|
||||
remaining = timeout - (System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
if (!regions.isEmpty()) {
|
||||
regionsInTransition.waitForUpdate(100);
|
||||
}
|
||||
}
|
||||
return stillInTransition;
|
||||
|
||||
return regions.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2810,10 +2839,13 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
/**
|
||||
* @return A copy of the Map of regions currently in transition.
|
||||
*/
|
||||
public NavigableMap<String, RegionState> getRegionsInTransition() {
|
||||
synchronized (this.regionsInTransition) {
|
||||
return new TreeMap<String, RegionState>(this.regionsInTransition);
|
||||
}
|
||||
public NavigableMap<String, RegionState> copyRegionsInTransition() {
|
||||
// no lock concurrent access ok
|
||||
return regionsInTransition.copyMap();
|
||||
}
|
||||
|
||||
NotifiableConcurrentSkipListMap<String, RegionState> getRegionsInTransition() {
|
||||
return regionsInTransition;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2830,8 +2862,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
long oldestRITTime = 0;
|
||||
int ritThreshold = this.master.getConfiguration().
|
||||
getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
|
||||
for (Map.Entry<String, RegionState> e : this.regionsInTransition.
|
||||
entrySet()) {
|
||||
for (Map.Entry<String, RegionState> e : this.regionsInTransition.copyEntrySet()) {
|
||||
totalRITs++;
|
||||
long ritTime = currentTime - e.getValue().getStamp();
|
||||
if (ritTime > ritThreshold) { // more than the threshold
|
||||
|
@ -2852,9 +2883,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @return True if regions in transition.
|
||||
*/
|
||||
public boolean isRegionsInTransition() {
|
||||
synchronized (this.regionsInTransition) {
|
||||
return !this.regionsInTransition.isEmpty();
|
||||
}
|
||||
// no lock concurrent access ok: we could imagine that someone is currently going to remove
|
||||
// it or add a region, but it's sequentially consistent.
|
||||
return !(this.regionsInTransition.isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2863,9 +2894,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* RegionState
|
||||
*/
|
||||
public RegionState isRegionInTransition(final HRegionInfo hri) {
|
||||
synchronized (this.regionsInTransition) {
|
||||
return this.regionsInTransition.get(hri.getEncodedName());
|
||||
}
|
||||
// no lock concurrent access ok: we could imagine that someone is currently going to remove
|
||||
// it or add it, but it's sequentially consistent.
|
||||
return this.regionsInTransition.get(hri.getEncodedName());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3004,18 +3035,16 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (this.bulkAssign) return;
|
||||
boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
|
||||
|
||||
synchronized (regionsInTransition) {
|
||||
// Iterate all regions in transition checking for time outs
|
||||
long now = System.currentTimeMillis();
|
||||
for (RegionState regionState : regionsInTransition.values()) {
|
||||
if (regionState.getStamp() + timeout <= now) {
|
||||
//decide on action upon timeout
|
||||
actOnTimeOut(regionState);
|
||||
} else if (this.allRegionServersOffline && !noRSAvailable) {
|
||||
// if some RSs just came back online, we can start the
|
||||
// the assignment right away
|
||||
actOnTimeOut(regionState);
|
||||
}
|
||||
// Iterate all regions in transition checking for time outs
|
||||
long now = System.currentTimeMillis();
|
||||
// no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
|
||||
// a copy while another thread is adding/removing items
|
||||
for (RegionState regionState : regionsInTransition.copyValues()) {
|
||||
if (regionState.getStamp() + timeout <= now ||
|
||||
(this.allRegionServersOffline && !noRSAvailable)) {
|
||||
//decide on action upon timeout or, if some RSs just came back online, we can start the
|
||||
// the assignment
|
||||
actOnTimeOut(regionState);
|
||||
}
|
||||
}
|
||||
setAllRegionServersOffline(noRSAvailable);
|
||||
|
@ -3203,11 +3232,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// See if any of the regions that were online on this server were in RIT
|
||||
// If they are, normal timeouts will deal with them appropriately so
|
||||
// let's skip a manual re-assignment.
|
||||
synchronized (regionsInTransition) {
|
||||
for (RegionState region : this.regionsInTransition.values()) {
|
||||
if (deadRegions.remove(region.getRegion())) {
|
||||
rits.add(region);
|
||||
}
|
||||
// no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
|
||||
// a copy while another thread is adding/removing items
|
||||
for (RegionState region : this.regionsInTransition.copyValues()) {
|
||||
if (deadRegions.remove(region.getRegion())) {
|
||||
rits.add(region);
|
||||
}
|
||||
}
|
||||
return rits;
|
||||
|
|
|
@ -1560,7 +1560,7 @@ Server {
|
|||
this.serverManager.getDeadServers(),
|
||||
this.serverName,
|
||||
backupMasters,
|
||||
this.assignmentManager.getRegionsInTransition(),
|
||||
this.assignmentManager.copyRegionsInTransition(),
|
||||
this.getCoprocessors());
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ public class MXBeanImpl implements MXBean {
|
|||
List<RegionsInTransitionInfo> info =
|
||||
new ArrayList<RegionsInTransitionInfo>();
|
||||
for (final Entry<String, RegionState> entry :
|
||||
master.getAssignmentManager().getRegionsInTransition().entrySet()) {
|
||||
master.getAssignmentManager().copyRegionsInTransition().entrySet()) {
|
||||
RegionsInTransitionInfo innerinfo = new RegionsInTransitionInfo() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -105,7 +105,7 @@ public class MasterDumpServlet extends StateDumpServlet {
|
|||
|
||||
private void dumpRIT(HMaster master, PrintWriter out) {
|
||||
NavigableMap<String, RegionState> regionsInTransition =
|
||||
master.getAssignmentManager().getRegionsInTransition();
|
||||
master.getAssignmentManager().copyRegionsInTransition();
|
||||
for (Map.Entry<String, RegionState> e : regionsInTransition.entrySet()) {
|
||||
String rid = e.getKey();
|
||||
RegionState rs = e.getValue();
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
/**
|
||||
* <p>Encapsulate a ConcurrentSkipListMap to ensure that notifications are sent when
|
||||
* the list is modified. Offers only the functions used by the AssignementManager, hence
|
||||
* does not extends ConcurrentSkipListMap.</p>
|
||||
*
|
||||
* <p>Used only in master package (main & test), so it's package protected.</p>
|
||||
*
|
||||
* @param <K> - class for the keys
|
||||
* @param <V> - class for the values
|
||||
*/
|
||||
class NotifiableConcurrentSkipListMap<K, V> {
|
||||
private final ConcurrentSkipListMap<K, V> delegatee = new ConcurrentSkipListMap<K, V>();
|
||||
|
||||
public boolean isEmpty() {
|
||||
return delegatee.isEmpty();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return delegatee.size();
|
||||
}
|
||||
|
||||
public void put(K k, V v) {
|
||||
synchronized (delegatee) {
|
||||
delegatee.put(k, v);
|
||||
delegatee.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public V remove(K k) {
|
||||
synchronized (delegatee) {
|
||||
V v = delegatee.remove(k);
|
||||
if (v != null) {
|
||||
delegatee.notifyAll();
|
||||
}
|
||||
return v;
|
||||
}
|
||||
}
|
||||
|
||||
public void waitForUpdate(long timeout) throws InterruptedException {
|
||||
synchronized (delegatee){
|
||||
delegatee.wait(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean containsKey(K k) {
|
||||
return delegatee.containsKey(k);
|
||||
}
|
||||
|
||||
public Collection<?> keySet() {
|
||||
return delegatee.keySet();
|
||||
}
|
||||
|
||||
public V get(K k) {
|
||||
return delegatee.get(k);
|
||||
}
|
||||
|
||||
public NavigableMap<K, V> copyMap() {
|
||||
return delegatee.clone();
|
||||
}
|
||||
|
||||
public Collection<V> copyValues() {
|
||||
Collection<V> values = new ArrayList<V>(size());
|
||||
synchronized (delegatee) {
|
||||
values.addAll(delegatee.values());
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
public Set<Map.Entry<K, V>> copyEntrySet() {
|
||||
Set<Map.Entry<K, V>> entrySet = new TreeSet<Map.Entry<K, V>>();
|
||||
synchronized (delegatee) {
|
||||
Iterator<Map.Entry<K, V>> it = delegatee.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
entrySet.add(it.next());
|
||||
}
|
||||
}
|
||||
return entrySet;
|
||||
}
|
||||
|
||||
public void waitForUpdate() throws InterruptedException {
|
||||
synchronized (delegatee) {
|
||||
delegatee.wait();
|
||||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
if (!delegatee.isEmpty()) {
|
||||
synchronized (delegatee) {
|
||||
delegatee.clear();
|
||||
delegatee.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* A utility class to manage a set of locks. Each lock is identified by a String which serves
|
||||
* as a key. Typical usage is: <p>
|
||||
* class Example{
|
||||
* private final static KeyLocker<String> locker = new Locker<String>();
|
||||
* <p/>
|
||||
* public void foo(String s){
|
||||
* Lock lock = locker.acquireLock(s);
|
||||
* try {
|
||||
* // whatever
|
||||
* }finally{
|
||||
* lock.unlock();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </p>
|
||||
*/
|
||||
public class KeyLocker<K> {
|
||||
private static final Log LOG = LogFactory.getLog(KeyLocker.class);
|
||||
|
||||
// The number of lock we want to easily support. It's not a maximum.
|
||||
private static final int NB_CONCURRENT_LOCKS = 1000;
|
||||
|
||||
// We need an atomic counter to manage the number of users using the lock and free it when
|
||||
// it's equal to zero.
|
||||
private final Map<K, Pair<KeyLock<K>, AtomicInteger>> locks =
|
||||
new HashMap<K, Pair<KeyLock<K>, AtomicInteger>>(NB_CONCURRENT_LOCKS);
|
||||
|
||||
/**
|
||||
* Return a lock for the given key. The lock is already locked.
|
||||
*
|
||||
* @param key
|
||||
*/
|
||||
public ReentrantLock acquireLock(K key) {
|
||||
if (key == null) throw new IllegalArgumentException("key must not be null");
|
||||
|
||||
Pair<KeyLock<K>, AtomicInteger> lock;
|
||||
synchronized (this) {
|
||||
lock = locks.get(key);
|
||||
if (lock == null) {
|
||||
lock = new Pair<KeyLock<K>, AtomicInteger>(
|
||||
new KeyLock<K>(this, key), new AtomicInteger(1));
|
||||
locks.put(key, lock);
|
||||
} else {
|
||||
lock.getSecond().incrementAndGet();
|
||||
}
|
||||
}
|
||||
lock.getFirst().lock();
|
||||
return lock.getFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
* Free the lock for the given key.
|
||||
*/
|
||||
private synchronized void releaseLock(K key) {
|
||||
Pair<KeyLock<K>, AtomicInteger> lock = locks.get(key);
|
||||
if (lock != null) {
|
||||
if (lock.getSecond().decrementAndGet() == 0) {
|
||||
locks.remove(key);
|
||||
}
|
||||
} else {
|
||||
String message = "Can't release the lock for " + key+", this key is not in the key list." +
|
||||
" known keys are: "+ locks.keySet();
|
||||
LOG.error(message);
|
||||
throw new RuntimeException(message);
|
||||
}
|
||||
}
|
||||
|
||||
static class KeyLock<K> extends ReentrantLock {
|
||||
private final KeyLocker<K> locker;
|
||||
private final K lockId;
|
||||
|
||||
private KeyLock(KeyLocker<K> locker, K lockId) {
|
||||
super();
|
||||
this.locker = locker;
|
||||
this.lockId = lockId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock() {
|
||||
super.unlock();
|
||||
locker.releaseLock(lockId);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -185,6 +185,7 @@ public class TestDrainingServer {
|
|||
*/
|
||||
@Test (timeout=30000)
|
||||
public void testDrainingServerWithAbort() throws KeeperException, Exception {
|
||||
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
|
||||
// Ensure a stable env
|
||||
TEST_UTIL.getHBaseAdmin().balanceSwitch(false);
|
||||
|
@ -204,7 +205,7 @@ public class TestDrainingServer {
|
|||
final int regionsOnDrainingServer = drainingServer.getNumberOfOnlineRegions();
|
||||
Assert.assertTrue(regionsOnDrainingServer > 0);
|
||||
|
||||
ServerManager sm = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
|
||||
ServerManager sm = master.getServerManager();
|
||||
|
||||
Collection<HRegion> regionsBefore = drainingServer.
|
||||
getCopyOfOnlineRegionsSortedBySize().values();
|
||||
|
@ -221,9 +222,9 @@ public class TestDrainingServer {
|
|||
|
||||
Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer,
|
||||
drainingServer.getNumberOfOnlineRegions());
|
||||
Assert.assertTrue("We should not have regions in transition here.",
|
||||
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
|
||||
getRegionsInTransition().isEmpty() );
|
||||
Assert.assertFalse("We should not have regions in transition here. List is: "+
|
||||
master.getAssignmentManager().copyRegionsInTransition(),
|
||||
master.getAssignmentManager().isRegionsInTransition() );
|
||||
|
||||
// Kill a few regionservers.
|
||||
for (int aborted = 0; aborted <= 2; aborted++) {
|
||||
|
|
|
@ -158,8 +158,8 @@ public class TestHCM {
|
|||
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
|
||||
|
||||
// We can wait for all regions to be onlines, that makes log reading easier when debugging
|
||||
while (!TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getAssignmentManager().getRegionsInTransition().isEmpty()) {
|
||||
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getAssignmentManager().isRegionsInTransition()) {
|
||||
}
|
||||
|
||||
// Now moving the region to the second server
|
||||
|
|
|
@ -974,7 +974,7 @@ public class TestMasterObserver {
|
|||
// wait for assignments to finish
|
||||
AssignmentManager mgr = master.getAssignmentManager();
|
||||
Collection<AssignmentManager.RegionState> transRegions =
|
||||
mgr.getRegionsInTransition().values();
|
||||
mgr.copyRegionsInTransition().values();
|
||||
for (AssignmentManager.RegionState state : transRegions) {
|
||||
mgr.waitOnRegionToClearRegionsInTransition(state.getRegion());
|
||||
}
|
||||
|
|
|
@ -67,23 +67,43 @@ public class Mocking {
|
|||
Bytes.toBytes(sn.getStartcode())));
|
||||
return new Result(kvs);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @param sn
|
||||
* ServerName to use making startcode and server in meta
|
||||
* @param hri
|
||||
* Region to serialize into HRegionInfo
|
||||
* @param sn ServerName to use making startcode and server in meta
|
||||
* @param hri Region to serialize into HRegionInfo
|
||||
* @return A mocked up Result that fakes a Get on a row in the <code>.META.</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Result getMetaTableRowResultAsSplitRegion(final HRegionInfo hri, final ServerName sn)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
hri.setOffline(true);
|
||||
hri.setSplit(true);
|
||||
return getMetaTableRowResult(hri, sn);
|
||||
}
|
||||
|
||||
|
||||
static void waitForRegionPendingOpenInRIT(AssignmentManager am, String encodedName)
|
||||
throws InterruptedException {
|
||||
// We used to do a check like this:
|
||||
//!Mocking.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) {
|
||||
// There is a race condition with this: because we may do the transition to
|
||||
// RS_ZK_REGION_OPENING before the RIT is internally updated. We need to wait for the
|
||||
// RIT to be as we need it to be instead. This cannot happen in a real cluster as we
|
||||
// update the RIT before sending the openRegion request.
|
||||
|
||||
boolean wait = true;
|
||||
while (wait) {
|
||||
AssignmentManager.RegionState state = am.getRegionsInTransition().get(encodedName);
|
||||
if (state != null && state.isPendingOpen()){
|
||||
wait = false;
|
||||
} else {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Fakes the regionserver-side zk transitions of a region open.
|
||||
* @param w ZooKeeperWatcher to use.
|
||||
|
@ -92,13 +112,12 @@ public class Mocking {
|
|||
* @throws KeeperException
|
||||
* @throws DeserializationException
|
||||
*/
|
||||
static void fakeRegionServerRegionOpenInZK(final ZooKeeperWatcher w,
|
||||
static void fakeRegionServerRegionOpenInZK(HMaster master, final ZooKeeperWatcher w,
|
||||
final ServerName sn, final HRegionInfo hri)
|
||||
throws KeeperException, DeserializationException {
|
||||
// Wait till we see the OFFLINE zk node before we proceed.
|
||||
while (!verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
throws KeeperException, DeserializationException, InterruptedException {
|
||||
// Wait till the we region is ready to be open in RIT.
|
||||
waitForRegionPendingOpenInRIT(master.getAssignmentManager(), hri.getEncodedName());
|
||||
|
||||
// Get current versionid else will fail on transition from OFFLINE to OPENING below
|
||||
int versionid = ZKAssign.getVersion(w, hri);
|
||||
assertNotSame(-1, versionid);
|
||||
|
|
|
@ -191,10 +191,8 @@ public class TestAssignmentManager {
|
|||
int versionid =
|
||||
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
|
||||
assertNotSame(versionid, -1);
|
||||
while (!Mocking.verifyRegionState(this.watcher, REGIONINFO,
|
||||
EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
|
||||
|
||||
// Get current versionid else will fail on transition from OFFLINE to
|
||||
// OPENING below
|
||||
versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
|
||||
|
@ -235,10 +233,8 @@ public class TestAssignmentManager {
|
|||
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
|
||||
assertNotSame(versionid, -1);
|
||||
am.gate.set(false);
|
||||
while (!Mocking.verifyRegionState(this.watcher, REGIONINFO,
|
||||
EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
|
||||
|
||||
// Get current versionid else will fail on transition from OFFLINE to
|
||||
// OPENING below
|
||||
versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
|
||||
|
@ -278,10 +274,8 @@ public class TestAssignmentManager {
|
|||
int versionid =
|
||||
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
|
||||
assertNotSame(versionid, -1);
|
||||
while (!Mocking.verifyRegionState(this.watcher, REGIONINFO,
|
||||
EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
|
||||
|
||||
am.gate.set(false);
|
||||
// Get current versionid else will fail on transition from OFFLINE to
|
||||
// OPENING below
|
||||
|
@ -324,7 +318,7 @@ public class TestAssignmentManager {
|
|||
*/
|
||||
@Test
|
||||
public void testBalance()
|
||||
throws IOException, KeeperException, DeserializationException {
|
||||
throws IOException, KeeperException, DeserializationException, InterruptedException {
|
||||
// Create and startup an executor. This is used by AssignmentManager
|
||||
// handling zk callbacks.
|
||||
ExecutorService executor = startupMasterExecutor("testBalanceExecutor");
|
||||
|
@ -360,9 +354,8 @@ public class TestAssignmentManager {
|
|||
// balancer. The zk node will be OFFLINE waiting for regionserver to
|
||||
// transition it through OPENING, OPENED. Wait till we see the OFFLINE
|
||||
// zk node before we proceed.
|
||||
while (!Mocking.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
|
||||
|
||||
// Get current versionid else will fail on transition from OFFLINE to OPENING below
|
||||
versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
|
||||
assertNotSame(-1, versionid);
|
||||
|
@ -686,12 +679,12 @@ public class TestAssignmentManager {
|
|||
@Test
|
||||
public void testRegionPlanIsUpdatedWhenRegionFailsToOpen() throws IOException, KeeperException,
|
||||
ServiceException, InterruptedException {
|
||||
this.server.getConfiguration().setClass(
|
||||
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockedLoadBalancer.class,
|
||||
LoadBalancer.class);
|
||||
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
|
||||
this.server, this.serverManager);
|
||||
try {
|
||||
this.server.getConfiguration().setClass(
|
||||
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockedLoadBalancer.class,
|
||||
LoadBalancer.class);
|
||||
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
|
||||
this.server, this.serverManager);
|
||||
// Boolean variable used for waiting until randomAssignment is called and
|
||||
// new
|
||||
// plan is generated.
|
||||
|
@ -738,12 +731,16 @@ public class TestAssignmentManager {
|
|||
// be new plan.
|
||||
assertNotSame("Same region plan should not come", regionPlan,
|
||||
newRegionPlan);
|
||||
assertTrue("Destnation servers should be different.", !(regionPlan
|
||||
assertTrue("Destination servers should be different.", !(regionPlan
|
||||
.getDestination().equals(newRegionPlan.getDestination())));
|
||||
|
||||
Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
|
||||
} finally {
|
||||
this.server.getConfiguration().setClass(
|
||||
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class,
|
||||
LoadBalancer.class);
|
||||
am.getExecutorService().shutdown();
|
||||
am.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -808,7 +808,7 @@ public class TestMasterFailover {
|
|||
final long maxTime = 120000;
|
||||
boolean done = master.assignmentManager.waitUntilNoRegionsInTransition(maxTime);
|
||||
if (!done) {
|
||||
LOG.info("rit=" + master.assignmentManager.getRegionsInTransition());
|
||||
LOG.info("rit=" + master.assignmentManager.copyRegionsInTransition());
|
||||
}
|
||||
long elapsed = System.currentTimeMillis() - now;
|
||||
assertTrue("Elapsed=" + elapsed + ", maxTime=" + maxTime + ", done=" + done,
|
||||
|
|
|
@ -60,6 +60,8 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Standup the master and fake it to test various aspects of master function.
|
||||
|
@ -71,6 +73,7 @@ import org.junit.experimental.categories.Category;
|
|||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestMasterNoCluster {
|
||||
private static Logger LOG = LoggerFactory.getLogger(TestMasterNoCluster.class);
|
||||
private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
|
@ -244,6 +247,9 @@ public class TestMasterNoCluster {
|
|||
public void testCatalogDeploys()
|
||||
throws IOException, KeeperException, InterruptedException, DeserializationException, ServiceException {
|
||||
final Configuration conf = TESTUTIL.getConfiguration();
|
||||
conf.setInt("hbase.master.wait.on.regionservers.mintostart", 1);
|
||||
conf.setInt("hbase.master.wait.on.regionservers.maxtostart", 1);
|
||||
|
||||
final long now = System.currentTimeMillis();
|
||||
// Name for our single mocked up regionserver.
|
||||
final ServerName sn = new ServerName("0.example.org", 0, now);
|
||||
|
@ -291,10 +297,13 @@ public class TestMasterNoCluster {
|
|||
}
|
||||
};
|
||||
master.start();
|
||||
LOG.info("Master has started");
|
||||
|
||||
try {
|
||||
// Wait till master is up ready for RPCs.
|
||||
while (!master.isRpcServerOpen()) Threads.sleep(10);
|
||||
LOG.info("RpcServerOpen has started");
|
||||
|
||||
// Fake master that there is a regionserver out there. Report in.
|
||||
RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
|
||||
request.setPort(rs0.getServerName().getPort());
|
||||
|
@ -317,8 +326,11 @@ public class TestMasterNoCluster {
|
|||
// region open, master will have set an unassigned znode for the region up
|
||||
// into zk for the regionserver to transition. Lets do that now to
|
||||
// complete fake of a successful open.
|
||||
Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(),
|
||||
Mocking.fakeRegionServerRegionOpenInZK(master, rs0.getZooKeeper(),
|
||||
rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO);
|
||||
LOG.info("fakeRegionServerRegionOpenInZK has started");
|
||||
|
||||
|
||||
// Need to set root location as r1. Usually the regionserver does this
|
||||
// when its figured it just opened the root region by setting the root
|
||||
// location up into zk. Since we're mocking regionserver, need to do this
|
||||
|
@ -326,7 +338,7 @@ public class TestMasterNoCluster {
|
|||
RootRegionTracker.setRootLocation(rs0.getZooKeeper(), rs0.getServerName());
|
||||
// Do same transitions for .META. (presuming master has by now assigned
|
||||
// .META. to rs1).
|
||||
Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(),
|
||||
Mocking.fakeRegionServerRegionOpenInZK(master, rs0.getZooKeeper(),
|
||||
rs0.getServerName(), HRegionInfo.FIRST_META_REGIONINFO);
|
||||
// Now trigger our mock regionserver to start returning a row when we
|
||||
// go to get .META. entry in -ROOT-. We do it by setting into
|
||||
|
|
|
@ -34,8 +34,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.hbase.tmpl.master.AssignmentManagerStatusTmpl;
|
||||
|
@ -84,7 +82,7 @@ public class TestMasterStatusServlet {
|
|||
Maps.newTreeMap();
|
||||
regionsInTransition.put("r1",
|
||||
new RegionState(FAKE_HRI, RegionState.State.CLOSING, 12345L, FAKE_HOST));
|
||||
Mockito.doReturn(regionsInTransition).when(am).getRegionsInTransition();
|
||||
Mockito.doReturn(regionsInTransition).when(am).copyRegionsInTransition();
|
||||
Mockito.doReturn(am).when(master).getAssignmentManager();
|
||||
|
||||
// Fake ZKW
|
||||
|
@ -170,7 +168,7 @@ public class TestMasterStatusServlet {
|
|||
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(),
|
||||
new RegionState(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
RegionState.State.CLOSING, 12345L, FAKE_HOST));
|
||||
Mockito.doReturn(regionsInTransition).when(am).getRegionsInTransition();
|
||||
Mockito.doReturn(regionsInTransition).when(am).copyRegionsInTransition();
|
||||
|
||||
// Render to a string
|
||||
StringWriter sw = new StringWriter();
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestKeyLocker {
|
||||
@Test
|
||||
public void testLocker(){
|
||||
KeyLocker<String> locker = new KeyLocker();
|
||||
ReentrantLock lock1 = locker.acquireLock("l1");
|
||||
Assert.assertTrue(lock1.isHeldByCurrentThread());
|
||||
|
||||
ReentrantLock lock2 = locker.acquireLock("l2");
|
||||
Assert.assertTrue(lock2.isHeldByCurrentThread());
|
||||
Assert.assertTrue(lock1 != lock2);
|
||||
|
||||
// same key = same lock
|
||||
ReentrantLock lock20 = locker.acquireLock("l2");
|
||||
Assert.assertTrue(lock20 == lock2);
|
||||
Assert.assertTrue(lock2.isHeldByCurrentThread());
|
||||
Assert.assertTrue(lock20.isHeldByCurrentThread());
|
||||
|
||||
// Locks are still reentrant; so with 2 acquires we want two unlocks
|
||||
lock20.unlock();
|
||||
Assert.assertTrue(lock20.isHeldByCurrentThread());
|
||||
|
||||
lock2.unlock();
|
||||
Assert.assertFalse(lock20.isHeldByCurrentThread());
|
||||
|
||||
// The lock object was freed once useless, so we're recreating a new one
|
||||
ReentrantLock lock200 = locker.acquireLock("l2");
|
||||
Assert.assertTrue(lock2 != lock200);
|
||||
lock200.unlock();
|
||||
Assert.assertFalse(lock200.isHeldByCurrentThread());
|
||||
|
||||
// first lock is still there
|
||||
Assert.assertTrue(lock1.isHeldByCurrentThread());
|
||||
lock1.unlock();
|
||||
Assert.assertFalse(lock1.isHeldByCurrentThread());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue