HBASE-9095. AssignmentManager's handleRegion should respect the single threaded nature of the processing

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1510799 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2013-08-05 23:10:05 +00:00
parent fe938fdc55
commit f06af44932
3 changed files with 63 additions and 186 deletions

View File

@ -159,6 +159,17 @@ public class AssignmentManager extends ZooKeeperListener {
private final ExecutorService executorService;
// For unit tests, keep track of calls to ClosedRegionHandler
private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled =
new HashMap<HRegionInfo, AtomicBoolean>();
// For unit tests, keep track of calls to OpenedRegionHandler
private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled =
new HashMap<HRegionInfo, AtomicBoolean>();
// For unit tests, keep track of calls to SplitRegionHandler
private AtomicBoolean splitRegionHandlerCalled = new AtomicBoolean(false);
//Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService;
@ -836,8 +847,8 @@ public class AssignmentManager extends ZooKeeperListener {
break;
}
// Run handler to do the rest of the SPLIT handling.
this.executorService.submit(new SplitRegionHandler(server, this,
regionState.getRegion(), sn, daughters));
new SplitRegionHandler(server, this, regionState.getRegion(), sn, daughters).process();
splitRegionHandlerCalled.set(true);
break;
case RS_ZK_REGION_MERGING:
@ -872,8 +883,7 @@ public class AssignmentManager extends ZooKeeperListener {
+ merge_a + ", rs_b=" + merge_b);
}
// Run handler to do the rest of the MERGED handling.
this.executorService.submit(new MergedRegionHandler(
server, this, sn, mergeRegions));
new MergedRegionHandler(server, this, sn, mergeRegions).process();
break;
case M_ZK_REGION_CLOSING:
@ -907,8 +917,8 @@ public class AssignmentManager extends ZooKeeperListener {
regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
if (regionState != null) {
removeClosedRegion(regionState.getRegion());
this.executorService.submit(new ClosedRegionHandler(server,
this, regionState.getRegion()));
new ClosedRegionHandler(server, this, regionState.getRegion()).process();
closedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true));
}
break;
@ -941,8 +951,7 @@ public class AssignmentManager extends ZooKeeperListener {
// When there are more than one region server a new RS is selected as the
// destination and the same is updated in the regionplan. (HBASE-5546)
getRegionPlan(regionState.getRegion(), sn, true);
this.executorService.submit(new ClosedRegionHandler(server,
this, regionState.getRegion()));
new ClosedRegionHandler(server, this, regionState.getRegion()).process();
}
}
break;
@ -980,8 +989,9 @@ public class AssignmentManager extends ZooKeeperListener {
regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
if (regionState != null) {
failedOpenTracker.remove(encodedName); // reset the count, if any
this.executorService.submit(new OpenedRegionHandler(
server, this, regionState.getRegion(), sn, expectedVersion));
new OpenedRegionHandler(
server, this, regionState.getRegion(), sn, expectedVersion).process();
openedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true));
}
break;
@ -993,6 +1003,32 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
//For unit tests only
boolean wasClosedHandlerCalled(HRegionInfo hri) {
AtomicBoolean b = closedRegionHandlerCalled.get(hri);
//compareAndSet to be sure that unit tests don't see stale values. Means,
//we will return true exactly once unless the handler code resets to true
//this value.
return b == null ? false : b.compareAndSet(true, false);
}
//For unit tests only
boolean wasOpenedHandlerCalled(HRegionInfo hri) {
AtomicBoolean b = openedRegionHandlerCalled.get(hri);
//compareAndSet to be sure that unit tests don't see stale values. Means,
//we will return true exactly once unless the handler code resets to true
//this value.
return b == null ? false : b.compareAndSet(true, false);
}
//For unit tests only
boolean wasSplitHandlerCalled() {
//compareAndSet to be sure that unit tests don't see stale values. Means,
//we will return true exactly once unless the handler code resets to true
//this value.
return splitRegionHandlerCalled.compareAndSet(true, false);
}
/**
* @return Returns true if this RegionState is splittable; i.e. the
* RegionState is currently in splitting state or pending_close or

View File

@ -24,16 +24,11 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -86,35 +81,27 @@ public class TestMaster {
tableRegions.get(0).getFirst().getEndKey());
// Now trigger a split and stop when the split is in progress
CountDownLatch split = new CountDownLatch(1);
CountDownLatch proceed = new CountDownLatch(1);
RegionSplitListener list = new RegionSplitListener(split, proceed);
cluster.getMaster().executorService.
registerListener(EventType.RS_ZK_REGION_SPLIT, list);
LOG.info("Splitting table");
TEST_UTIL.getHBaseAdmin().split(TABLENAME);
LOG.info("Waiting for split result to be about to open");
split.await(60, TimeUnit.SECONDS);
try {
LOG.info("Making sure we can call getTableRegions while opening");
tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(),
while (!m.assignmentManager.wasSplitHandlerCalled()) {
Thread.sleep(100);
}
LOG.info("Making sure we can call getTableRegions while opening");
tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(),
TABLENAME, false);
LOG.info("Regions: " + Joiner.on(',').join(tableRegions));
// We have three regions because one is split-in-progress
assertEquals(3, tableRegions.size());
LOG.info("Making sure we can call getTableRegionClosest while opening");
Pair<HRegionInfo, ServerName> pair =
LOG.info("Regions: " + Joiner.on(',').join(tableRegions));
// We have three regions because one is split-in-progress
assertEquals(3, tableRegions.size());
LOG.info("Making sure we can call getTableRegionClosest while opening");
Pair<HRegionInfo, ServerName> pair =
m.getTableRegionForRow(TABLENAME, Bytes.toBytes("cde"));
LOG.info("Result is: " + pair);
Pair<HRegionInfo, ServerName> tableRegionFromName =
LOG.info("Result is: " + pair);
Pair<HRegionInfo, ServerName> tableRegionFromName =
MetaReader.getRegion(m.getCatalogTracker(),
pair.getFirst().getRegionName());
assertEquals(tableRegionFromName.getFirst(), pair.getFirst());
} finally {
proceed.countDown();
}
assertEquals(tableRegionFromName.getFirst(), pair.getFirst());
}
@Test
@ -175,33 +162,5 @@ public class TestMaster {
TEST_UTIL.deleteTable(tableName);
}
}
static class RegionSplitListener implements EventHandlerListener {
CountDownLatch split, proceed;
public RegionSplitListener(CountDownLatch split, CountDownLatch proceed) {
this.split = split;
this.proceed = proceed;
}
@Override
public void afterProcess(EventHandler event) {
if (event.getEventType() != EventType.RS_ZK_REGION_SPLIT) {
return;
}
try {
split.countDown();
proceed.await(60, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
return;
}
@Override
public void beforeProcess(EventHandler event) {
}
}
}

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -43,10 +42,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
@ -116,29 +111,14 @@ public class TestZKBasedOpenCloseRegion {
HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer));
LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
EventHandlerListener closeListener =
new ReopenEventListener(hri.getRegionNameAsString(),
closeEventProcessed, EventType.RS_ZK_REGION_CLOSED);
cluster.getMaster().executorService.
registerListener(EventType.RS_ZK_REGION_CLOSED, closeListener);
EventHandlerListener openListener =
new ReopenEventListener(hri.getRegionNameAsString(),
reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
cluster.getMaster().executorService.
registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
LOG.info("Unassign " + hri.getRegionNameAsString());
cluster.getMaster().assignmentManager.unassign(hri);
while (!closeEventProcessed.get()) {
while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) {
Threads.sleep(100);
}
while (!reopenEventProcessed.get()) {
while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
Threads.sleep(100);
}
@ -157,83 +137,6 @@ public class TestZKBasedOpenCloseRegion {
return hri;
}
public static class ReopenEventListener implements EventHandlerListener {
private static final Log LOG = LogFactory.getLog(ReopenEventListener.class);
String regionName;
AtomicBoolean eventProcessed;
EventType eventType;
public ReopenEventListener(String regionName,
AtomicBoolean eventProcessed, EventType eventType) {
this.regionName = regionName;
this.eventProcessed = eventProcessed;
this.eventType = eventType;
}
@Override
public void beforeProcess(EventHandler event) {
if(event.getEventType() == eventType) {
LOG.info("Received " + eventType + " and beginning to process it");
}
}
@Override
public void afterProcess(EventHandler event) {
LOG.info("afterProcess(" + event + ")");
if(event.getEventType() == eventType) {
LOG.info("Finished processing " + eventType);
String regionName = "";
if(eventType == EventType.RS_ZK_REGION_OPENED) {
TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
} else if(eventType == EventType.RS_ZK_REGION_CLOSED) {
TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
}
if(this.regionName.equals(regionName)) {
eventProcessed.set(true);
}
synchronized(eventProcessed) {
eventProcessed.notifyAll();
}
}
}
}
public static class CloseRegionEventListener implements EventHandlerListener {
private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class);
String regionToClose;
AtomicBoolean closeEventProcessed;
public CloseRegionEventListener(String regionToClose,
AtomicBoolean closeEventProcessed) {
this.regionToClose = regionToClose;
this.closeEventProcessed = closeEventProcessed;
}
@Override
public void afterProcess(EventHandler event) {
LOG.info("afterProcess(" + event + ")");
if(event.getEventType() == EventType.RS_ZK_REGION_CLOSED) {
LOG.info("Finished processing CLOSE REGION");
TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
if (regionToClose.equals(hriCarrier.getHRegionInfo().getRegionNameAsString())) {
LOG.info("Setting closeEventProcessed flag");
closeEventProcessed.set(true);
} else {
LOG.info("Region to close didn't match");
}
}
}
@Override
public void beforeProcess(EventHandler event) {
if(event.getEventType() == EventType.M_RS_CLOSE_REGION) {
LOG.info("Received CLOSE RPC and beginning to process it");
}
}
}
/**
* This test shows how a region won't be able to be assigned to a RS
* if it's already "processing" it.
@ -253,13 +156,6 @@ public class TestZKBasedOpenCloseRegion {
// fake that hr1 is processing the region
hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true);
AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
EventHandlerListener openListener =
new ReopenEventListener(hri.getRegionNameAsString(),
reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
cluster.getMaster().executorService.
registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
// now ask the master to move the region to hr1, will fail
TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(hr1.getServerName().toString()));
@ -269,22 +165,14 @@ public class TestZKBasedOpenCloseRegion {
// remove the block and reset the boolean
hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes());
reopenEventProcessed.set(false);
// now try moving a region when there is no region in transition.
hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(hr1));
openListener =
new ReopenEventListener(hri.getRegionNameAsString(),
reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
cluster.getMaster().executorService.
registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(hr0.getServerName().toString()));
while (!reopenEventProcessed.get()) {
while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
Threads.sleep(100);
}
@ -304,15 +192,9 @@ public class TestZKBasedOpenCloseRegion {
HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer));
LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
EventHandlerListener listener =
new CloseRegionEventListener(hri.getRegionNameAsString(),
closeEventProcessed);
cluster.getMaster().executorService.registerListener(EventType.RS_ZK_REGION_CLOSED, listener);
cluster.getMaster().assignmentManager.unassign(hri);
while (!closeEventProcessed.get()) {
while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) {
Threads.sleep(100);
}
LOG.info("Done with testCloseRegion");