HBASE-4153 Handle RegionAlreadyInTransitionException in AssignmentManager

(Ramkrishna)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1173768 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-09-21 17:14:32 +00:00
parent 81f93ced4e
commit 7e6a6538b4
8 changed files with 92 additions and 33 deletions

View File

@ -289,6 +289,8 @@ Release 0.91.0 - Unreleased
HBASE-4400 .META. getting stuck if RS hosting it is dead and znode state is in
RS_ZK_REGION_OPENED (Ramkrishna)
HBASE-3421 Very wide rows -- 30M plus -- cause us OOME (Nate Putnam)
HBASE-4153 Handle RegionAlreadyInTransitionException in AssignmentManager
(Ramkrishna)
IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -60,9 +60,11 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
@ -79,7 +81,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
@ -94,6 +95,7 @@ import org.apache.zookeeper.data.Stat;
* Handles existing regions in transition during master failover.
*/
public class AssignmentManager extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
protected Server master;
@ -162,6 +164,9 @@ public class AssignmentManager extends ZooKeeperListener {
//Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService;
//String to compare the RegionsAlreadyInTransition from RS
private static final String ALREADY_TRANSITIONING = "for the region we are " +
"already trying to ";
/**
* Constructs a new assignment manager.
@ -1449,6 +1454,17 @@ public class AssignmentManager extends ZooKeeperListener {
}
break;
} catch (Throwable t) {
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
if (t instanceof RegionAlreadyInTransitionException) {
String errorMsg = "Failed assignment of " +
state.getRegion().getRegionNameAsString() + " to " +
plan.getDestination() + " as the region was already " +
extractRegionState((RegionAlreadyInTransitionException) t) +
" in the RS " +plan.getDestination();
LOG.error(errorMsg, t);
return;
}
LOG.warn("Failed assignment of " +
state.getRegion().getRegionNameAsString() + " to " +
plan.getDestination() + ", trying to assign elsewhere instead; " +
@ -1465,8 +1481,16 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
}
}
private State extractRegionState(RegionAlreadyInTransitionException t) {
RegionState.State state = t.getMessage().contains(
ALREADY_TRANSITIONING + "OPEN") ? RegionState.State.PENDING_OPEN
: RegionState.State.PENDING_CLOSE;
return state;
}
private void debugLog(HRegionInfo region, String string) {
if (region.isMetaTable() || region.isRootRegion()) {
LOG.info(string);
@ -2656,7 +2680,7 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* State of a Region while undergoing transitions.
*/
public static class RegionState implements Writable {
public static class RegionState implements org.apache.hadoop.io.Writable {
private HRegionInfo region;
public enum State {

View File

@ -43,6 +43,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -155,6 +156,7 @@ import com.google.common.collect.Lists;
*/
public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
Runnable, RegionServerServices {
public static final Log LOG = LogFactory.getLog(HRegionServer.class);
// Set when a report to the master comes back with a message asking us to
@ -182,8 +184,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
private Path rootDir;
private final Random rand = new Random();
private final Set<byte[]> regionsInTransitionInRS =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
//RegionName vs current action in progress
//true - if open region action in progress
//false - if close region action in progress
private final ConcurrentSkipListMap<byte[], Boolean> regionsInTransitionInRS =
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
/**
* Map of regions currently being served by this region server. Key is the
@ -306,6 +311,16 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
*/
private TableDescriptors tableDescriptors;
/*
* Strings to be used in forming the exception message for
* RegionsAlreadyInTransitionException. The below strings combination
* is used to extract the status in the master.
*/
private static final String ALREADY_TRANSITIONING = "for the region we are already trying to ";
private static final String RECEIVED = " received ";
private static final String OPEN = "OPEN ";
private static final String CLOSE = "CLOSE ";
/**
* Starts a HRegionServer at the default location
*
@ -803,7 +818,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// iterator of onlineRegions to close all user regions.
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
HRegionInfo hri = e.getValue().getRegionInfo();
if (!this.regionsInTransitionInRS.contains(hri.getEncodedNameAsBytes())) {
if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())) {
// Don't update zk with this close transition; pass false.
closeRegion(hri, abort, false);
}
@ -2352,9 +2367,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
throws IOException {
checkOpen();
if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
throw new RegionAlreadyInTransitionException("open", region.getEncodedName());
}
checkIfRegionInTransition(region,OPEN);
HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName());
if (null != onlineRegion) {
LOG.warn("Attempted open of " + region.getEncodedName()
@ -2363,7 +2376,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
LOG.info("Received request to open region: " +
region.getRegionNameAsString());
this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes());
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(),
true);
HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
// Need to pass the expected version in the constructor.
if (region.isRootRegion()) {
@ -2378,6 +2392,25 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
return RegionOpeningState.OPENED;
}
private void checkIfRegionInTransition(HRegionInfo region,
String currentAction) throws RegionAlreadyInTransitionException {
byte[] encodedName = region.getEncodedNameAsBytes();
if (this.regionsInTransitionInRS.containsKey(encodedName)) {
// The below exception message will be used in master.
throw new RegionAlreadyInTransitionException(getExceptionMessage(region,
encodedName, currentAction));
}
}
private String getExceptionMessage(HRegionInfo region, byte[] encodedName,
String receivedAction) {
boolean openAction = this.regionsInTransitionInRS.get(encodedName);
return REGIONSERVER + ":" + this.getServerName() + RECEIVED
+ receivedAction + ALREADY_TRANSITIONING + (openAction ? OPEN : CLOSE)
+ "; " + region.getRegionNameAsString();
}
@Override
@QosPriority(priority=HIGH_QOS)
@ -2408,9 +2441,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
throw new NotServingRegionException("Received close for "
+ region.getRegionNameAsString() + " but we are not serving it");
}
if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
throw new RegionAlreadyInTransitionException("close", region.getEncodedName());
}
checkIfRegionInTransition(region, CLOSE);
return closeRegion(region, false, zk);
}
@ -2430,12 +2461,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk) {
if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
LOG.warn("Received close for region we are already opening or closing; " +
region.getEncodedName());
return false;
}
this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes());
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false);
CloseRegionHandler crh = null;
if (region.isRootRegion()) {
crh = new CloseRootHandler(this, this, region, abort, zk);
@ -3031,7 +3062,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
public Set<byte[]> getRegionsInTransitionInRS() {
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
return this.regionsInTransitionInRS;
}

View File

@ -27,8 +27,7 @@ import java.io.IOException;
*/
public class RegionAlreadyInTransitionException extends IOException {
public RegionAlreadyInTransitionException(String action, String region) {
super("Received " + action + " for region we are" +
" already opening or closing; " + region);
public RegionAlreadyInTransitionException(String s) {
super(s);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.zookeeper.KeeperException;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* Services provided by {@link HRegionServer}
@ -74,7 +75,7 @@ public interface RegionServerServices extends OnlineRegions {
/**
* Get the regions that are currently being opened or closed in the RS
* @return set of regions in transition in this RS
* @return map of regions in transition in this RS
*/
public Set<byte[]> getRegionsInTransitionInRS();
}
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS();
}

View File

@ -49,6 +49,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
/**
@ -239,7 +241,7 @@ public class TestZKBasedOpenCloseRegion {
HRegionInfo hri = getNonMetaRegion(hr0.getOnlineRegions());
// fake that hr1 is processing the region
hr1.getRegionsInTransitionInRS().add(hri.getEncodedNameAsBytes());
hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true);
AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
EventHandlerListener openListener =
@ -252,12 +254,9 @@ public class TestZKBasedOpenCloseRegion {
TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(hr1.getServerName().toString()));
while (!reopenEventProcessed.get()) {
Threads.sleep(100);
}
// make sure the region came back
assertTrue(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()) == null);
assertEquals(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()), null);
// remove the block and reset the boolean
hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes());

View File

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -42,7 +44,8 @@ import org.apache.zookeeper.KeeperException;
class MockRegionServerServices implements RegionServerServices {
private final Map<String, HRegion> regions = new HashMap<String, HRegion>();
private boolean stopping = false;
private final Set<byte[]> rit = new HashSet<byte[]>();
private final ConcurrentSkipListMap<byte[], Boolean> rit =
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
@Override
public boolean removeFromOnlineRegions(String encodedRegionName) {
@ -80,7 +83,7 @@ class MockRegionServerServices implements RegionServerServices {
}
@Override
public Set<byte[]> getRegionsInTransitionInRS() {
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
return rit;
}
@ -132,10 +135,10 @@ class MockRegionServerServices implements RegionServerServices {
public boolean isStopped() {
return false;
}
}
@Override
public boolean isAborted() {
// TODO Auto-generated method stub
return false;
}
}
}

View File

@ -132,7 +132,7 @@ public class TestOpenRegionHandler {
@Test
public void testFailedOpenRegion() throws Exception {
Server server = new MockServer(HTU);
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
RegionServerServices rsServices = new MockRegionServerServices();
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
@ -157,7 +157,7 @@ public class TestOpenRegionHandler {
@Test
public void testFailedUpdateMeta() throws Exception {
Server server = new MockServer(HTU);
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
RegionServerServices rsServices = new MockRegionServerServices();
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());