HBASE-8797 Prevent merging regions from moving during online merge

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1499462 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-07-03 16:26:32 +00:00
parent 1095afa36f
commit 4ff281e50d
12 changed files with 257 additions and 136 deletions

View File

@ -85,7 +85,7 @@ public enum EventType {
*
* RS region merge has completed.
*/
RS_ZK_REGION_MERGE (9, ExecutorType.MASTER_SERVER_OPERATIONS),
RS_ZK_REGION_MERGED (9, ExecutorType.MASTER_SERVER_OPERATIONS),
/**
* Messages originating from Master to RS.<br>

View File

@ -46,7 +46,9 @@ public class RegionState implements org.apache.hadoop.io.Writable {
SPLITTING, // server started split of a region
SPLIT, // server completed split of a region
FAILED_OPEN, // failed to open, and won't retry any more
FAILED_CLOSE // failed to close, and won't retry any more
FAILED_CLOSE, // failed to close, and won't retry any more
MERGING, // server started merge a region
MERGED // server completed merge a region
}
// Many threads can update the state at the stamp at the same time
@ -124,10 +126,6 @@ public class RegionState implements org.apache.hadoop.io.Writable {
return state == State.SPLITTING;
}
public boolean isSplit() {
return state == State.SPLIT;
}
public boolean isFailedOpen() {
return state == State.FAILED_OPEN;
}
@ -136,6 +134,14 @@ public class RegionState implements org.apache.hadoop.io.Writable {
return state == State.FAILED_CLOSE;
}
public boolean isMerging() {
return state == State.MERGING;
}
public boolean isOpenOrMergingOnServer(final ServerName sn) {
return isOnServer(sn) && (isOpened() || isMerging());
}
public boolean isPendingOpenOrOpeningOnServer(final ServerName sn) {
return isOnServer(sn) && (isPendingOpen() || isOpening());
}
@ -211,6 +217,12 @@ public class RegionState implements org.apache.hadoop.io.Writable {
case FAILED_CLOSE:
rs = ClusterStatusProtos.RegionState.State.FAILED_CLOSE;
break;
case MERGING:
rs = ClusterStatusProtos.RegionState.State.MERGING;
break;
case MERGED:
rs = ClusterStatusProtos.RegionState.State.MERGED;
break;
default:
throw new IllegalStateException("");
}
@ -261,6 +273,12 @@ public class RegionState implements org.apache.hadoop.io.Writable {
case FAILED_CLOSE:
state = State.FAILED_CLOSE;
break;
case MERGING:
state = State.MERGING;
break;
case MERGED:
state = State.MERGED;
break;
default:
throw new IllegalStateException("");
}

View File

@ -65,6 +65,8 @@ public final class ClusterStatusProtos {
SPLIT(8, 8),
FAILED_OPEN(9, 9),
FAILED_CLOSE(10, 10),
MERGING(11, 11),
MERGED(12, 12),
;
public static final int OFFLINE_VALUE = 0;
@ -78,6 +80,8 @@ public final class ClusterStatusProtos {
public static final int SPLIT_VALUE = 8;
public static final int FAILED_OPEN_VALUE = 9;
public static final int FAILED_CLOSE_VALUE = 10;
public static final int MERGING_VALUE = 11;
public static final int MERGED_VALUE = 12;
public final int getNumber() { return value; }
@ -95,6 +99,8 @@ public final class ClusterStatusProtos {
case 8: return SPLIT;
case 9: return FAILED_OPEN;
case 10: return FAILED_CLOSE;
case 11: return MERGING;
case 12: return MERGED;
default: return null;
}
}
@ -125,7 +131,7 @@ public final class ClusterStatusProtos {
}
private static final State[] VALUES = {
OFFLINE, PENDING_OPEN, OPENING, OPEN, PENDING_CLOSE, CLOSING, CLOSED, SPLITTING, SPLIT, FAILED_OPEN, FAILED_CLOSE,
OFFLINE, PENDING_OPEN, OPENING, OPEN, PENDING_CLOSE, CLOSING, CLOSED, SPLITTING, SPLIT, FAILED_OPEN, FAILED_CLOSE, MERGING, MERGED,
};
public static State valueOf(
@ -7112,46 +7118,46 @@ public final class ClusterStatusProtos {
static {
java.lang.String[] descriptorData = {
"\n\023ClusterStatus.proto\032\013hbase.proto\032\017Clus" +
"terId.proto\032\010FS.proto\"\211\002\n\013RegionState\022\037\n" +
"terId.proto\032\010FS.proto\"\242\002\n\013RegionState\022\037\n" +
"\nregionInfo\030\001 \002(\0132\013.RegionInfo\022!\n\005state\030" +
"\002 \002(\0162\022.RegionState.State\022\r\n\005stamp\030\003 \001(\004" +
"\"\246\001\n\005State\022\013\n\007OFFLINE\020\000\022\020\n\014PENDING_OPEN\020" +
"\"\277\001\n\005State\022\013\n\007OFFLINE\020\000\022\020\n\014PENDING_OPEN\020" +
"\001\022\013\n\007OPENING\020\002\022\010\n\004OPEN\020\003\022\021\n\rPENDING_CLOS" +
"E\020\004\022\013\n\007CLOSING\020\005\022\n\n\006CLOSED\020\006\022\r\n\tSPLITTIN" +
"G\020\007\022\t\n\005SPLIT\020\010\022\017\n\013FAILED_OPEN\020\t\022\020\n\014FAILE" +
"D_CLOSE\020\n\"W\n\022RegionInTransition\022\036\n\004spec\030" +
"\001 \002(\0132\020.RegionSpecifier\022!\n\013regionState\030\002",
" \002(\0132\014.RegionState\"\260\003\n\nRegionLoad\022)\n\017reg" +
"ionSpecifier\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006" +
"stores\030\002 \001(\r\022\022\n\nstorefiles\030\003 \001(\r\022\037\n\027stor" +
"eUncompressedSizeMB\030\004 \001(\r\022\027\n\017storefileSi" +
"zeMB\030\005 \001(\r\022\026\n\016memstoreSizeMB\030\006 \001(\r\022\034\n\024st" +
"orefileIndexSizeMB\030\007 \001(\r\022\031\n\021readRequests" +
"Count\030\010 \001(\004\022\032\n\022writeRequestsCount\030\t \001(\004\022" +
"\032\n\022totalCompactingKVs\030\n \001(\004\022\033\n\023currentCo" +
"mpactedKVs\030\013 \001(\004\022\027\n\017rootIndexSizeKB\030\014 \001(" +
"\r\022\036\n\026totalStaticIndexSizeKB\030\r \001(\r\022\036\n\026tot",
"alStaticBloomSizeKB\030\016 \001(\r\022\032\n\022completeSeq" +
"uenceId\030\017 \001(\004\"\372\001\n\nServerLoad\022\030\n\020numberOf" +
"Requests\030\001 \001(\r\022\035\n\025totalNumberOfRequests\030" +
"\002 \001(\r\022\022\n\nusedHeapMB\030\003 \001(\r\022\021\n\tmaxHeapMB\030\004" +
" \001(\r\022 \n\013regionLoads\030\005 \003(\0132\013.RegionLoad\022\"" +
"\n\014coprocessors\030\006 \003(\0132\014.Coprocessor\022\027\n\017re" +
"portStartTime\030\007 \001(\004\022\025\n\rreportEndTime\030\010 \001" +
"(\004\022\026\n\016infoServerPort\030\t \001(\r\"N\n\016LiveServer" +
"Info\022\033\n\006server\030\001 \002(\0132\013.ServerName\022\037\n\nser" +
"verLoad\030\002 \002(\0132\013.ServerLoad\"\327\002\n\rClusterSt",
"atus\022.\n\014hbaseVersion\030\001 \001(\0132\030.HBaseVersio" +
"nFileContent\022$\n\013liveServers\030\002 \003(\0132\017.Live" +
"ServerInfo\022 \n\013deadServers\030\003 \003(\0132\013.Server" +
"Name\0220\n\023regionsInTransition\030\004 \003(\0132\023.Regi" +
"onInTransition\022\035\n\tclusterId\030\005 \001(\0132\n.Clus" +
"terId\022(\n\022masterCoprocessors\030\006 \003(\0132\014.Copr" +
"ocessor\022\033\n\006master\030\007 \001(\0132\013.ServerName\022\"\n\r" +
"backupMasters\030\010 \003(\0132\013.ServerName\022\022\n\nbala" +
"ncerOn\030\t \001(\010BF\n*org.apache.hadoop.hbase." +
"protobuf.generatedB\023ClusterStatusProtosH",
"\001\240\001\001"
"D_CLOSE\020\n\022\013\n\007MERGING\020\013\022\n\n\006MERGED\020\014\"W\n\022Re" +
"gionInTransition\022\036\n\004spec\030\001 \002(\0132\020.RegionS",
"pecifier\022!\n\013regionState\030\002 \002(\0132\014.RegionSt" +
"ate\"\260\003\n\nRegionLoad\022)\n\017regionSpecifier\030\001 " +
"\002(\0132\020.RegionSpecifier\022\016\n\006stores\030\002 \001(\r\022\022\n" +
"\nstorefiles\030\003 \001(\r\022\037\n\027storeUncompressedSi" +
"zeMB\030\004 \001(\r\022\027\n\017storefileSizeMB\030\005 \001(\r\022\026\n\016m" +
"emstoreSizeMB\030\006 \001(\r\022\034\n\024storefileIndexSiz" +
"eMB\030\007 \001(\r\022\031\n\021readRequestsCount\030\010 \001(\004\022\032\n\022" +
"writeRequestsCount\030\t \001(\004\022\032\n\022totalCompact" +
"ingKVs\030\n \001(\004\022\033\n\023currentCompactedKVs\030\013 \001(" +
"\004\022\027\n\017rootIndexSizeKB\030\014 \001(\r\022\036\n\026totalStati",
"cIndexSizeKB\030\r \001(\r\022\036\n\026totalStaticBloomSi" +
"zeKB\030\016 \001(\r\022\032\n\022completeSequenceId\030\017 \001(\004\"\372" +
"\001\n\nServerLoad\022\030\n\020numberOfRequests\030\001 \001(\r\022" +
"\035\n\025totalNumberOfRequests\030\002 \001(\r\022\022\n\nusedHe" +
"apMB\030\003 \001(\r\022\021\n\tmaxHeapMB\030\004 \001(\r\022 \n\013regionL" +
"oads\030\005 \003(\0132\013.RegionLoad\022\"\n\014coprocessors\030" +
"\006 \003(\0132\014.Coprocessor\022\027\n\017reportStartTime\030\007" +
" \001(\004\022\025\n\rreportEndTime\030\010 \001(\004\022\026\n\016infoServe" +
"rPort\030\t \001(\r\"N\n\016LiveServerInfo\022\033\n\006server\030" +
"\001 \002(\0132\013.ServerName\022\037\n\nserverLoad\030\002 \002(\0132\013",
".ServerLoad\"\327\002\n\rClusterStatus\022.\n\014hbaseVe" +
"rsion\030\001 \001(\0132\030.HBaseVersionFileContent\022$\n" +
"\013liveServers\030\002 \003(\0132\017.LiveServerInfo\022 \n\013d" +
"eadServers\030\003 \003(\0132\013.ServerName\0220\n\023regions" +
"InTransition\030\004 \003(\0132\023.RegionInTransition\022" +
"\035\n\tclusterId\030\005 \001(\0132\n.ClusterId\022(\n\022master" +
"Coprocessors\030\006 \003(\0132\014.Coprocessor\022\033\n\006mast" +
"er\030\007 \001(\0132\013.ServerName\022\"\n\rbackupMasters\030\010" +
" \003(\0132\013.ServerName\022\022\n\nbalancerOn\030\t \001(\010BF\n" +
"*org.apache.hadoop.hbase.protobuf.genera",
"tedB\023ClusterStatusProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -43,6 +43,8 @@ message RegionState {
SPLIT = 8; // server completed split of a region
FAILED_OPEN = 9; // failed to open, and won't retry any more
FAILED_CLOSE = 10; // failed to close, and won't retry any more
MERGING = 11; // server started merge a region
MERGED = 12; // server completed merge of a region
}
}

View File

@ -543,6 +543,7 @@ public class AssignmentManager extends ZooKeeperListener {
// Get ServerName. Could not be null.
final ServerName sn = rt.getServerName();
final String encodedRegionName = regionInfo.getEncodedName();
final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedRegionName);
LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
@ -635,38 +636,45 @@ public class AssignmentManager extends ZooKeeperListener {
// The regionserver started the split, but died before updating the status.
// It means (hopefully) that the split was not finished
// TBD - to study. In the meantime, do nothing as in the past.
LOG.warn("Processed region " + regionInfo.getEncodedName() + " in state : " + et +
LOG.warn("Processed region " + prettyPrintedRegionName + " in state : " + et +
" on a dead regionserver: " + sn + " doing nothing");
} else {
LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
et + " nothing to do.");
// We don't do anything. The way the code is written in RS_ZK_REGION_SPLIT management,
// it adds the RS_ZK_REGION_SPLITTING state if needed. So we don't have to do it here.
regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
LOG.info("Processed region " + prettyPrintedRegionName
+ " in state : " + et);
}
break;
case RS_ZK_REGION_SPLIT:
if (!serverManager.isServerOnline(sn)) {
forceOffline(regionInfo, rt);
} else {
LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
LOG.info("Processed region " + prettyPrintedRegionName + " in state : " +
et + " nothing to do.");
// We don't do anything. The regionserver is supposed to update the znode
// multiple times so if it's still up we will receive an update soon.
}
break;
case RS_ZK_REGION_MERGING:
// nothing to do
LOG.info("Processed region " + regionInfo.getEncodedName()
+ " in state : " + et + " nothing to do.");
if (!serverManager.isServerOnline(sn)) {
// The regionserver started the merge, but died before updating the status.
// It means (hopefully) that the merge was not finished
// This node should be gone soon since it is ephemeral.
LOG.warn("Processed region " + prettyPrintedRegionName + " in state : " + et +
" on a dead regionserver: " + sn + " doing nothing");
} else {
handleRegionMerging(rt, prettyPrintedRegionName, sn);
LOG.info("Processed region " + prettyPrintedRegionName
+ " in state : " + et);
}
break;
case RS_ZK_REGION_MERGE:
case RS_ZK_REGION_MERGED:
if (!serverManager.isServerOnline(sn)) {
// ServerShutdownHandler would handle this region
LOG.warn("Processed region " + regionInfo.getEncodedName()
LOG.warn("Processed region " + prettyPrintedRegionName
+ " in state : " + et + " on a dead regionserver: " + sn
+ " doing nothing");
} else {
LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
LOG.info("Processed region " + prettyPrintedRegionName + " in state : " +
et + " nothing to do.");
// We don't do anything. The regionserver is supposed to update the znode
// multiple times so if it's still up we will receive an update soon.
@ -834,10 +842,11 @@ public class AssignmentManager extends ZooKeeperListener {
case RS_ZK_REGION_MERGING:
// Merged region is a new region, we can't find it in the region states now.
// Do nothing.
// However, the two merging regions are not new. They should be in state for merging.
handleRegionMerging(rt, prettyPrintedRegionName, sn);
break;
case RS_ZK_REGION_MERGE:
case RS_ZK_REGION_MERGED:
// Assert that we can get a serverinfo for this server.
if (!this.serverManager.isServerOnline(sn)) {
LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
@ -855,9 +864,16 @@ public class AssignmentManager extends ZooKeeperListener {
break;
}
assert mergeRegions.size() == 3;
// Run handler to do the rest of the MERGE handling.
this.executorService.submit(new MergedRegionHandler(server, this, sn,
mergeRegions));
HRegionInfo merge_a = mergeRegions.get(1);
HRegionInfo merge_b = mergeRegions.get(2);
if (!isInStateForMerging(sn, merge_a, merge_b)) {
// Move on. Merge already happened (passed PONR), no point to stop now
LOG.warn("Got merge event, but not in state good for MERGED; rs_a="
+ merge_a + ", rs_b=" + merge_b);
}
// Run handler to do the rest of the MERGED handling.
this.executorService.submit(new MergedRegionHandler(
server, this, sn, mergeRegions));
break;
case M_ZK_REGION_CLOSING:
@ -991,6 +1007,17 @@ public class AssignmentManager extends ZooKeeperListener {
return false;
}
/**
* @return Returns true if both regions are merging/open on specified server
*/
private boolean isInStateForMerging(final ServerName sn,
final HRegionInfo a, final HRegionInfo b) {
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
return ((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
&& (rs_b == null || rs_b.isOpenOrMergingOnServer(sn)));
}
// TODO: processFavoredNodes might throw an exception, for e.g., if the
// meta could not be contacted/updated. We need to see how seriously to treat
// this problem as. Should we fail the current assignment. We should be able
@ -1188,31 +1215,25 @@ public class AssignmentManager extends ZooKeeperListener {
if (rs == null) return;
HRegionInfo regionInfo = rs.getRegion();
if (rs.isSplit()) {
LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
"clearing from RIT; rs=" + rs);
regionOffline(rs.getRegion());
} else {
String regionNameStr = regionInfo.getRegionNameAsString();
LOG.debug("The znode of region " + regionNameStr
+ " has been deleted.");
if (rs.isOpened()) {
ServerName serverName = rs.getServerName();
regionOnline(regionInfo, serverName);
LOG.info("The master has opened the region "
+ regionNameStr + " that was online on " + serverName);
boolean disabled = getZKTable().isDisablingOrDisabledTable(
regionInfo.getTableNameAsString());
if (!serverManager.isServerOnline(serverName) && !disabled) {
LOG.info("Opened region " + regionNameStr
+ "but the region server is offline, reassign the region");
assign(regionInfo, true);
} else if (disabled) {
// if server is offline, no hurt to unassign again
LOG.info("Opened region " + regionNameStr
+ "but this table is disabled, triggering close of region");
unassign(regionInfo);
}
String regionNameStr = regionInfo.getRegionNameAsString();
LOG.debug("The znode of region " + regionNameStr
+ " has been deleted, region state: " + rs);
if (rs.isOpened()) {
ServerName serverName = rs.getServerName();
regionOnline(regionInfo, serverName);
LOG.info("The master has opened the region "
+ regionNameStr + " that was online on " + serverName);
boolean disabled = getZKTable().isDisablingOrDisabledTable(
regionInfo.getTableNameAsString());
if (!serverManager.isServerOnline(serverName) && !disabled) {
LOG.info("Opened region " + regionNameStr
+ "but the region server is offline, reassign the region");
assign(regionInfo, true);
} else if (disabled) {
// if server is offline, no hurt to unassign again
LOG.info("Opened region " + regionNameStr
+ "but this table is disabled, triggering close of region");
unassign(regionInfo);
}
}
} finally {
@ -1261,7 +1282,9 @@ public class AssignmentManager extends ZooKeeperListener {
RegionTransition rt = RegionTransition.parseFrom(data);
//See HBASE-7551, handle splitting too, in case we miss the node change event
if (rt.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
EventType type = rt.getEventType();
if (type == EventType.RS_ZK_REGION_SPLITTING
|| type == EventType.RS_ZK_REGION_MERGING) {
handleRegion(rt, stat.getVersion());
}
} catch (DeserializationException de) {
@ -2218,8 +2241,8 @@ public class AssignmentManager extends ZooKeeperListener {
NodeExistsException nee = (NodeExistsException)e;
String path = nee.getPath();
try {
if (isSplitOrSplittingOrMergeOrMerging(path)) {
LOG.debug(path + " is SPLIT or SPLITTING or MERGE or MERGING; " +
if (isSplitOrSplittingOrMergedOrMerging(path)) {
LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
"skipping unassign because region no longer exists -- its split or merge");
return;
}
@ -2298,14 +2321,14 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* @param path
* @return True if znode is in SPLIT or SPLITTING or MERGE or MERGING state.
* @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
* @throws KeeperException Can happen if the znode went away in meantime.
* @throws DeserializationException
*/
private boolean isSplitOrSplittingOrMergeOrMerging(final String path)
private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
throws KeeperException, DeserializationException {
boolean result = false;
// This may fail if the SPLIT or SPLITTING or MERGE or MERGING znode gets
// This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
// cleaned up before we can get data from it.
byte [] data = ZKAssign.getData(watcher, path);
if (data == null) return false;
@ -2313,7 +2336,7 @@ public class AssignmentManager extends ZooKeeperListener {
switch (rt.getEventType()) {
case RS_ZK_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING:
case RS_ZK_REGION_MERGE:
case RS_ZK_REGION_MERGED:
case RS_ZK_REGION_MERGING:
result = true;
break;
@ -2922,6 +2945,7 @@ public class AssignmentManager extends ZooKeeperListener {
case SPLITTING:
case FAILED_OPEN:
case FAILED_CLOSE:
case MERGING:
break;
default:
@ -3033,7 +3057,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
List<HRegionInfo> regions = regionStates.serverOffline(sn);
List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
HRegionInfo hri = it.next();
String encodedName = hri.getEncodedName();
@ -3049,12 +3073,13 @@ public class AssignmentManager extends ZooKeeperListener {
+ " since it is not opening on the dead server any more: " + sn);
it.remove();
} else {
try{
try {
// Delete the ZNode if exists
ZKAssign.deleteNodeFailSilent(watcher, hri);
} catch (KeeperException ke) {
server.abort("Unexpected ZK exception deleting node " + hri, ke);
}
if (zkTable.isDisablingOrDisabledTable(hri.getTableNameAsString())) {
it.remove();
regionStates.regionOffline(hri);
@ -3185,4 +3210,34 @@ public class AssignmentManager extends ZooKeeperListener {
}
return true;
}
/**
* A helper to handle region merging transition event.
* It transitions merging regions to MERGING state.
*/
private boolean handleRegionMerging(final RegionTransition rt,
final String prettyPrintedRegionName, final ServerName sn) {
byte [] payloadOfMerging = rt.getPayload();
List<HRegionInfo> mergingRegions;
try {
mergingRegions = HRegionInfo.parseDelimitedFrom(
payloadOfMerging, 0, payloadOfMerging.length);
} catch (IOException e) {
LOG.error("Dropped merging! Failed reading merging payload for "
+ prettyPrintedRegionName);
return false;
}
assert mergingRegions.size() == 2;
HRegionInfo merging_a = mergingRegions.get(0);
HRegionInfo merging_b = mergingRegions.get(1);
if (!isInStateForMerging(sn, merging_a, merging_b)) {
LOG.warn("Dropped merging! Not in state good for MERGING; rs_a="
+ merging_a + ", rs_b=" + merging_b);
return false;
}
regionStates.updateRegionState(merging_a, RegionState.State.MERGING);
regionStates.updateRegionState(merging_b, RegionState.State.MERGING);
return true;
}
}

View File

@ -131,7 +131,7 @@ public class GeneralBulkAssigner extends BulkAssigner {
HRegionInfo hri = regionInfoIterator.next();
RegionState state = regionStates.getRegionState(hri);
if ((!regionStates.isRegionInTransition(hri) && regionStates.isRegionAssigned(hri))
|| state.isSplit() || state.isSplitting()) {
|| state.isSplitting() || state.isMerging()) {
regionInfoIterator.remove();
}
}

View File

@ -38,6 +38,9 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* Region state accountant. It holds the states of all regions in the memory.
@ -204,8 +207,7 @@ public class RegionStates {
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
* Update a region state. It will be put in transition if not already there.
*/
public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state) {
@ -216,8 +218,7 @@ public class RegionStates {
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
* Update a region state. It will be put in transition if not already there.
*
* If we can't find the region info based on the region name in
* the transition, log a warning and return null.
@ -239,8 +240,7 @@ public class RegionStates {
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
* Update a region state. It will be put in transition if not already there.
*/
public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state, final ServerName serverName) {
@ -253,8 +253,8 @@ public class RegionStates {
}
if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
LOG.warn("Failed to transition " + hri.getShortNameToLog() + " on " + serverName + ": " +
state);
LOG.warn("Failed to transition " + hri.getShortNameToLog()
+ " on " + serverName + ": " + state);
}
String regionName = hri.getEncodedName();
@ -264,8 +264,8 @@ public class RegionStates {
if (oldState == null || oldState.getState() != regionState.getState()) {
LOG.info("Region transitioned from " + oldState + " to " + regionState);
}
if (state != State.SPLITTING && (newServerName != null
|| (state != State.PENDING_CLOSE && state != State.CLOSING))) {
if (newServerName != null || (
state != State.PENDING_CLOSE && state != State.CLOSING)) {
regionsInTransition.put(regionName, regionState);
}
@ -341,7 +341,8 @@ public class RegionStates {
/**
* A server is offline, all regions on it are dead.
*/
public synchronized List<HRegionInfo> serverOffline(final ServerName sn) {
public synchronized List<HRegionInfo> serverOffline(
final ZooKeeperWatcher watcher, final ServerName sn) {
// Clean up this server from map of servers to regions, and remove all regions
// of this server from online map of regions.
List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
@ -358,9 +359,19 @@ public class RegionStates {
HRegionInfo hri = state.getRegion();
if (assignedRegions.contains(hri)) {
// Region is open on this region server, but in transition.
// This region must be moving away from this server.
// This region must be moving away from this server, or splitting/merging.
// SSH will handle it, either skip assigning, or re-assign.
LOG.info("Transitioning region " + state + " will be handled by SSH for " + sn);
if (state.isSplitting() || state.isMerging()) {
LOG.info("Offline splitting/merging region " + state);
try {
// Delete the ZNode if exists
ZKAssign.deleteNodeFailSilent(watcher, hri);
regionOffline(hri);
} catch (KeeperException ke) {
server.abort("Unexpected ZK exception deleting node " + hri, ke);
}
}
} else if (sn.equals(state.getServerName())) {
// Region is in transition on this region server, and this
// region is not open on this server. So the region must be

View File

@ -35,7 +35,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
* Handles MERGE regions event on Master, master receive the merge report from
* Handles MERGED regions event on Master, master receive the merge report from
* the regionserver, then offline the merging regions and online the merged
* region.Here region_a sorts before region_b.
*/
@ -52,7 +52,7 @@ public class MergedRegionHandler extends EventHandler implements
public MergedRegionHandler(Server server,
AssignmentManager assignmentManager, ServerName sn,
final List<HRegionInfo> mergeRegions) {
super(server, EventType.RS_ZK_REGION_MERGE);
super(server, EventType.RS_ZK_REGION_MERGED);
assert mergeRegions.size() == 3;
this.assignmentManager = assignmentManager;
this.merged = mergeRegions.get(0);
@ -96,7 +96,7 @@ public class MergedRegionHandler extends EventHandler implements
// It's possible that the RS tickles in between the reading of the
// znode and the deleting, so it's safe to retry.
successful = ZKAssign.deleteNode(this.server.getZooKeeper(),
encodedRegionName, EventType.RS_ZK_REGION_MERGE);
encodedRegionName, EventType.RS_ZK_REGION_MERGED);
}
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
@ -105,11 +105,11 @@ public class MergedRegionHandler extends EventHandler implements
LOG.debug("The znode " + znodePath
+ " does not exist. May be deleted already.");
} else {
server.abort("Error deleting MERGE node in ZK for transition ZK node ("
server.abort("Error deleting MERGED node in ZK for transition ZK node ("
+ merged.getEncodedName() + ")", e);
}
}
LOG.info("Handled MERGE event; merged="
LOG.info("Handled MERGED event; merged="
+ this.merged.getRegionNameAsString() + " region_a="
+ this.region_a.getRegionNameAsString() + "region_b="
+ this.region_b.getRegionNameAsString());

View File

@ -230,9 +230,9 @@ public class ServerShutdownHandler extends EventHandler {
}
if (rit != null) {
if (!rit.isOnServer(serverName)
|| rit.isClosed() || rit.isOpened() || rit.isSplit()) {
|| rit.isClosed() || rit.isOpened()) {
// Skip regions that are in transition on other server,
// or in state closed/opened/split
// or in state closed/opened
LOG.info("Skip assigning region " + rit);
continue;
}
@ -247,15 +247,7 @@ public class ServerShutdownHandler extends EventHandler {
}
toAssignRegions.add(hri);
} else if (rit != null) {
if (rit.isSplitting() || rit.isSplit()) {
// This will happen when the RS went down and the call back for the SPLIITING or SPLIT
// has not yet happened for node Deleted event. In that case if the region was actually
// split
// but the RS had gone down before completing the split process then will not try to
// assign the parent region again. In that case we should make the region offline and
// also delete the region from RIT.
am.regionOffline(hri);
} else if ((rit.isClosing() || rit.isPendingClose())
if ((rit.isClosing() || rit.isPendingClose())
&& am.getZKTable().isDisablingOrDisabledTable(hri.getTableNameAsString())) {
// If the table was partially disabled and the RS went down, we should clear the RIT
// and remove the node for the region.

View File

@ -259,7 +259,7 @@ public class RegionMergeTransaction {
if (server != null && server.getZooKeeper() != null) {
try {
createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
server.getServerName());
server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
} catch (KeeperException e) {
throw new IOException("Failed creating MERGING znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
@ -274,7 +274,8 @@ public class RegionMergeTransaction {
// Note that if the transition fails then the rollback will delete the
// created znode as the journal entry SET_MERGING_IN_ZK is added.
this.znodeVersion = transitionNodeMerging(server.getZooKeeper(),
this.mergedRegionInfo, server.getServerName(), -1);
this.mergedRegionInfo, server.getServerName(), -1,
region_a.getRegionInfo(), region_b.getRegionInfo());
} catch (KeeperException e) {
throw new IOException("Failed setting MERGING znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
@ -298,6 +299,19 @@ public class RegionMergeTransaction {
// clean this up.
mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
if (server != null && server.getZooKeeper() != null) {
try {
// Do one more check on the merging znode (before it is too late) in case
// any merging region is moved somehow. If so, the znode transition will fail.
this.znodeVersion = transitionNodeMerging(server.getZooKeeper(),
this.mergedRegionInfo, server.getServerName(), this.znodeVersion,
region_a.getRegionInfo(), region_b.getRegionInfo());
} catch (KeeperException e) {
throw new IOException("Failed setting MERGING znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
}
// Log to the journal that we are creating merged region. We could fail
// halfway through. If we do, we could have left
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
@ -665,18 +679,20 @@ public class RegionMergeTransaction {
* @throws IOException
*/
int createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
final ServerName serverName) throws KeeperException, IOException {
final ServerName serverName, final HRegionInfo a,
final HRegionInfo b) throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for "
+ region.getEncodedName() + " in MERGING state"));
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
RegionTransition rt = RegionTransition.createRegionTransition(
EventType.RS_ZK_REGION_MERGING, region.getRegionName(), serverName);
EventType.RS_ZK_REGION_MERGING, region.getRegionName(), serverName, payload);
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node);
}
// Transition node from MERGING to MERGING and pick up version so we
// can be sure this znode is ours; version is needed deleting.
return transitionNodeMerging(zkw, region, serverName, -1);
return transitionNodeMerging(zkw, region, serverName, -1, a, b);
}
/**
@ -723,7 +739,7 @@ public class RegionMergeTransaction {
final int znodeVersion) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
return ZKAssign.transitionNode(zkw, merged, serverName,
EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGE,
EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGED,
znodeVersion, payload);
}
@ -738,11 +754,12 @@ public class RegionMergeTransaction {
* @throws IOException
*/
int transitionNodeMerging(final ZooKeeperWatcher zkw,
final HRegionInfo parent, final ServerName serverName, final int version)
throws KeeperException, IOException {
final HRegionInfo parent, final ServerName serverName, final int version,
final HRegionInfo a, final HRegionInfo b) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, parent, serverName,
EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGING,
version);
version, payload);
}
private static int tickleNodeMerge(ZooKeeperWatcher zkw, HRegionInfo merged,
@ -750,7 +767,7 @@ public class RegionMergeTransaction {
final int znodeVersion) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, merged, serverName,
EventType.RS_ZK_REGION_MERGE, EventType.RS_ZK_REGION_MERGE,
EventType.RS_ZK_REGION_MERGED, EventType.RS_ZK_REGION_MERGED,
znodeVersion, payload);
}

View File

@ -1215,4 +1215,28 @@ public class TestAssignmentManager {
t.start();
while (!t.isAlive()) Threads.sleep(1);
}
@Test
public void testForceAssignMergingRegion() throws Exception {
// Region to use in test.
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
// Need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
server.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, null, null, master.getTableLockManager());
RegionStates regionStates = am.getRegionStates();
try {
// First set the state of the region to merging
regionStates.updateRegionState(hri, RegionState.State.MERGING);
// Now, try to assign it with force new plan
am.assign(hri, true, true);
assertEquals("The region should be still in merging state",
RegionState.State.MERGING, regionStates.getRegionState(hri).getState());
} finally {
am.shutdown();
}
}
}

View File

@ -113,7 +113,6 @@ public class TestRegionMergeTransactionOnCluster {
verifyRowCount(table, ROWSIZE);
table.close();
}
@Test
@ -189,8 +188,6 @@ public class TestRegionMergeTransactionOnCluster {
} finally {
admin.enableCatalogJanitor(true);
}
}
private void mergeRegionsAndVerifyRegionNum(HMaster master, byte[] tablename,
@ -293,5 +290,4 @@ public class TestRegionMergeTransactionOnCluster {
assertEquals(expectedRegionNum, rowCount);
scanner.close();
}
}