HBASE-9696 Master recovery ignores online merge znode

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1531435 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-10-11 21:46:14 +00:00
parent 0b204c8132
commit 30837458ee
16 changed files with 895 additions and 807 deletions

View File

@ -59,13 +59,13 @@ public enum EventType {
/** /**
* RS_ZK_REGION_SPLITTING<br> * RS_ZK_REGION_SPLITTING<br>
* *
* RS has started a region split. * RS has started a region split after master says it's ok to move on.
*/ */
RS_ZK_REGION_SPLITTING (5, null), RS_ZK_REGION_SPLITTING (5, null),
/** /**
* RS_ZK_REGION_SPLIT<br> * RS_ZK_REGION_SPLIT<br>
* *
* RS split has completed. * RS split has completed and is notifying the master.
*/ */
RS_ZK_REGION_SPLIT (6, ExecutorType.MASTER_SERVER_OPERATIONS), RS_ZK_REGION_SPLIT (6, ExecutorType.MASTER_SERVER_OPERATIONS),
/** /**
@ -77,15 +77,29 @@ public enum EventType {
/** /**
* RS_ZK_REGION_MERGING<br> * RS_ZK_REGION_MERGING<br>
* *
* RS has started merging regions. * RS has started merging regions after master says it's ok to move on.
*/ */
RS_ZK_REGION_MERGING (8, null), RS_ZK_REGION_MERGING (8, null),
/** /**
* RS_ZK_REGION_MERGE<br> * RS_ZK_REGION_MERGE<br>
* *
* RS region merge has completed. * RS region merge has completed and is notifying the master.
*/ */
RS_ZK_REGION_MERGED (9, ExecutorType.MASTER_SERVER_OPERATIONS), RS_ZK_REGION_MERGED (9, ExecutorType.MASTER_SERVER_OPERATIONS),
/**
* RS_ZK_REQUEST_REGION_SPLIT<br>
*
* RS has requested to split a region. This is to notify master
* and check with master if the region is in a state good to split.
*/
RS_ZK_REQUEST_REGION_SPLIT (10, null),
/**
* RS_ZK_REQUEST_REGION_MERGE<br>
*
* RS has requested to merge two regions. This is to notify master
* and check with master if two regions is in states good to merge.
*/
RS_ZK_REQUEST_REGION_MERGE (11, null),
/** /**
* Messages originating from Master to RS.<br> * Messages originating from Master to RS.<br>

View File

@ -52,7 +52,13 @@ public class RegionState implements org.apache.hadoop.io.Writable {
FAILED_OPEN, // failed to open, and won't retry any more FAILED_OPEN, // failed to open, and won't retry any more
FAILED_CLOSE, // failed to close, and won't retry any more FAILED_CLOSE, // failed to close, and won't retry any more
MERGING, // server started merge a region MERGING, // server started merge a region
MERGED // server completed merge a region MERGED, // server completed merge a region
SPLITTING_NEW, // new region to be created when RS splits a parent
// region but hasn't be created yet, or master doesn't
// know it's already created
MERGING_NEW // new region to be created when RS merges two
// daughter regions but hasn't be created yet, or
// master doesn't know it's already created
} }
// Many threads can update the state at the stamp at the same time // Many threads can update the state at the stamp at the same time
@ -134,6 +140,10 @@ public class RegionState implements org.apache.hadoop.io.Writable {
return state == State.SPLIT; return state == State.SPLIT;
} }
public boolean isSplittingNew() {
return state == State.SPLITTING_NEW;
}
public boolean isFailedOpen() { public boolean isFailedOpen() {
return state == State.FAILED_OPEN; return state == State.FAILED_OPEN;
} }
@ -150,10 +160,26 @@ public class RegionState implements org.apache.hadoop.io.Writable {
return state == State.MERGED; return state == State.MERGED;
} }
public boolean isMergingNew() {
return state == State.MERGING_NEW;
}
public boolean isOpenOrMergingOnServer(final ServerName sn) { public boolean isOpenOrMergingOnServer(final ServerName sn) {
return isOnServer(sn) && (isOpened() || isMerging()); return isOnServer(sn) && (isOpened() || isMerging());
} }
public boolean isOpenOrMergingNewOnServer(final ServerName sn) {
return isOnServer(sn) && (isOpened() || isMergingNew());
}
public boolean isOpenOrSplittingOnServer(final ServerName sn) {
return isOnServer(sn) && (isOpened() || isSplitting());
}
public boolean isOpenOrSplittingNewOnServer(final ServerName sn) {
return isOnServer(sn) && (isOpened() || isSplittingNew());
}
public boolean isPendingOpenOrOpeningOnServer(final ServerName sn) { public boolean isPendingOpenOrOpeningOnServer(final ServerName sn) {
return isOnServer(sn) && isPendingOpenOrOpening(); return isOnServer(sn) && isPendingOpenOrOpening();
} }
@ -176,6 +202,28 @@ public class RegionState implements org.apache.hadoop.io.Writable {
return serverName != null && serverName.equals(sn); return serverName != null && serverName.equals(sn);
} }
// Is a region in a state ready to go offline
public boolean isReadyToOffline() {
return isMerged() || isSplit() || isOffline()
|| isSplittingNew() || isMergingNew();
}
// Is a region in a state ready to go online
public boolean isReadyToOnline() {
return isOpened() || isSplittingNew() || isMergingNew();
}
// Is a region in a state not in transition but not unassignable
public boolean isNotUnassignableNotInTransition() {
return isNotUnassignableNotInTransition(state);
}
// Check if a state is not in transition, but not unassignable
public static boolean isNotUnassignableNotInTransition(State state) {
return state == State.MERGED || state == State.SPLIT || state == State.OFFLINE
|| state == State.SPLITTING_NEW || state == State.MERGING_NEW;
}
@Override @Override
public String toString() { public String toString() {
return "{" + hri.getShortNameToLog() return "{" + hri.getShortNameToLog()
@ -245,6 +293,12 @@ public class RegionState implements org.apache.hadoop.io.Writable {
case MERGED: case MERGED:
rs = ClusterStatusProtos.RegionState.State.MERGED; rs = ClusterStatusProtos.RegionState.State.MERGED;
break; break;
case SPLITTING_NEW:
rs = ClusterStatusProtos.RegionState.State.SPLITTING_NEW;
break;
case MERGING_NEW:
rs = ClusterStatusProtos.RegionState.State.MERGING_NEW;
break;
default: default:
throw new IllegalStateException(""); throw new IllegalStateException("");
} }
@ -301,6 +355,12 @@ public class RegionState implements org.apache.hadoop.io.Writable {
case MERGED: case MERGED:
state = State.MERGED; state = State.MERGED;
break; break;
case SPLITTING_NEW:
state = State.SPLITTING_NEW;
break;
case MERGING_NEW:
state = State.MERGING_NEW;
break;
default: default:
throw new IllegalStateException(""); throw new IllegalStateException("");
} }

View File

@ -51,6 +51,11 @@ public class SplitRandomRegionOfTableAction extends Action {
LOG.info("Performing action: Split random region of table " + tableName); LOG.info("Performing action: Split random region of table " + tableName);
List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes); List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
if (regions == null || regions.isEmpty()) {
LOG.info("Table " + tableName + " doesn't have regions to split");
return;
}
HRegionInfo region = PolicyBasedChaosMonkey.selectRandomItem( HRegionInfo region = PolicyBasedChaosMonkey.selectRandomItem(
regions.toArray(new HRegionInfo[regions.size()])); regions.toArray(new HRegionInfo[regions.size()]));
LOG.debug("Splitting region " + region.getRegionNameAsString()); LOG.debug("Splitting region " + region.getRegionNameAsString());

View File

@ -273,6 +273,23 @@ public final class ClusterStatusProtos {
* </pre> * </pre>
*/ */
MERGED(12, 12), MERGED(12, 12),
/**
* <code>SPLITTING_NEW = 13;</code>
*
* <pre>
* new region to be created when RS splits a parent
* </pre>
*/
SPLITTING_NEW(13, 13),
/**
* <code>MERGING_NEW = 14;</code>
*
* <pre>
* region but hasn't be created yet, or master doesn't
* know it's already created
* </pre>
*/
MERGING_NEW(14, 14),
; ;
/** /**
@ -379,6 +396,23 @@ public final class ClusterStatusProtos {
* </pre> * </pre>
*/ */
public static final int MERGED_VALUE = 12; public static final int MERGED_VALUE = 12;
/**
* <code>SPLITTING_NEW = 13;</code>
*
* <pre>
* new region to be created when RS splits a parent
* </pre>
*/
public static final int SPLITTING_NEW_VALUE = 13;
/**
* <code>MERGING_NEW = 14;</code>
*
* <pre>
* region but hasn't be created yet, or master doesn't
* know it's already created
* </pre>
*/
public static final int MERGING_NEW_VALUE = 14;
public final int getNumber() { return value; } public final int getNumber() { return value; }
@ -398,6 +432,8 @@ public final class ClusterStatusProtos {
case 10: return FAILED_CLOSE; case 10: return FAILED_CLOSE;
case 11: return MERGING; case 11: return MERGING;
case 12: return MERGED; case 12: return MERGED;
case 13: return SPLITTING_NEW;
case 14: return MERGING_NEW;
default: return null; default: return null;
} }
} }
@ -10303,48 +10339,49 @@ public final class ClusterStatusProtos {
static { static {
java.lang.String[] descriptorData = { java.lang.String[] descriptorData = {
"\n\023ClusterStatus.proto\032\013HBase.proto\032\017Clus" + "\n\023ClusterStatus.proto\032\013HBase.proto\032\017Clus" +
"terId.proto\032\010FS.proto\"\243\002\n\013RegionState\022 \n" + "terId.proto\032\010FS.proto\"\307\002\n\013RegionState\022 \n" +
"\013region_info\030\001 \002(\0132\013.RegionInfo\022!\n\005state" + "\013region_info\030\001 \002(\0132\013.RegionInfo\022!\n\005state" +
"\030\002 \002(\0162\022.RegionState.State\022\r\n\005stamp\030\003 \001(" + "\030\002 \002(\0162\022.RegionState.State\022\r\n\005stamp\030\003 \001(" +
"\004\"\277\001\n\005State\022\013\n\007OFFLINE\020\000\022\020\n\014PENDING_OPEN" + "\004\"\343\001\n\005State\022\013\n\007OFFLINE\020\000\022\020\n\014PENDING_OPEN" +
"\020\001\022\013\n\007OPENING\020\002\022\010\n\004OPEN\020\003\022\021\n\rPENDING_CLO" + "\020\001\022\013\n\007OPENING\020\002\022\010\n\004OPEN\020\003\022\021\n\rPENDING_CLO" +
"SE\020\004\022\013\n\007CLOSING\020\005\022\n\n\006CLOSED\020\006\022\r\n\tSPLITTI" + "SE\020\004\022\013\n\007CLOSING\020\005\022\n\n\006CLOSED\020\006\022\r\n\tSPLITTI" +
"NG\020\007\022\t\n\005SPLIT\020\010\022\017\n\013FAILED_OPEN\020\t\022\020\n\014FAIL" + "NG\020\007\022\t\n\005SPLIT\020\010\022\017\n\013FAILED_OPEN\020\t\022\020\n\014FAIL" +
"ED_CLOSE\020\n\022\013\n\007MERGING\020\013\022\n\n\006MERGED\020\014\"X\n\022R" + "ED_CLOSE\020\n\022\013\n\007MERGING\020\013\022\n\n\006MERGED\020\014\022\021\n\rS" +
"egionInTransition\022\036\n\004spec\030\001 \002(\0132\020.Region", "PLITTING_NEW\020\r\022\017\n\013MERGING_NEW\020\016\"X\n\022Regio",
"Specifier\022\"\n\014region_state\030\002 \002(\0132\014.Region" + "nInTransition\022\036\n\004spec\030\001 \002(\0132\020.RegionSpec" +
"State\"\320\003\n\nRegionLoad\022*\n\020region_specifier" + "ifier\022\"\n\014region_state\030\002 \002(\0132\014.RegionStat" +
"\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006stores\030\002 \001(\r" + "e\"\320\003\n\nRegionLoad\022*\n\020region_specifier\030\001 \002" +
"\022\022\n\nstorefiles\030\003 \001(\r\022\"\n\032store_uncompress" + "(\0132\020.RegionSpecifier\022\016\n\006stores\030\002 \001(\r\022\022\n\n" +
"ed_size_MB\030\004 \001(\r\022\031\n\021storefile_size_MB\030\005 " + "storefiles\030\003 \001(\r\022\"\n\032store_uncompressed_s" +
"\001(\r\022\030\n\020memstore_size_MB\030\006 \001(\r\022\037\n\027storefi" + "ize_MB\030\004 \001(\r\022\031\n\021storefile_size_MB\030\005 \001(\r\022" +
"le_index_size_MB\030\007 \001(\r\022\033\n\023read_requests_" + "\030\n\020memstore_size_MB\030\006 \001(\r\022\037\n\027storefile_i" +
"count\030\010 \001(\004\022\034\n\024write_requests_count\030\t \001(" + "ndex_size_MB\030\007 \001(\r\022\033\n\023read_requests_coun" +
"\004\022\034\n\024total_compacting_KVs\030\n \001(\004\022\035\n\025curre" + "t\030\010 \001(\004\022\034\n\024write_requests_count\030\t \001(\004\022\034\n" +
"nt_compacted_KVs\030\013 \001(\004\022\032\n\022root_index_siz", "\024total_compacting_KVs\030\n \001(\004\022\035\n\025current_c",
"e_KB\030\014 \001(\r\022\"\n\032total_static_index_size_KB" + "ompacted_KVs\030\013 \001(\004\022\032\n\022root_index_size_KB" +
"\030\r \001(\r\022\"\n\032total_static_bloom_size_KB\030\016 \001" + "\030\014 \001(\r\022\"\n\032total_static_index_size_KB\030\r \001" +
"(\r\022\034\n\024complete_sequence_id\030\017 \001(\004\"\212\002\n\nSer" + "(\r\022\"\n\032total_static_bloom_size_KB\030\016 \001(\r\022\034" +
"verLoad\022\032\n\022number_of_requests\030\001 \001(\r\022 \n\030t" + "\n\024complete_sequence_id\030\017 \001(\004\"\212\002\n\nServerL" +
"otal_number_of_requests\030\002 \001(\r\022\024\n\014used_he" + "oad\022\032\n\022number_of_requests\030\001 \001(\r\022 \n\030total" +
"ap_MB\030\003 \001(\r\022\023\n\013max_heap_MB\030\004 \001(\r\022!\n\014regi" + "_number_of_requests\030\002 \001(\r\022\024\n\014used_heap_M" +
"on_loads\030\005 \003(\0132\013.RegionLoad\022\"\n\014coprocess" + "B\030\003 \001(\r\022\023\n\013max_heap_MB\030\004 \001(\r\022!\n\014region_l" +
"ors\030\006 \003(\0132\014.Coprocessor\022\031\n\021report_start_" + "oads\030\005 \003(\0132\013.RegionLoad\022\"\n\014coprocessors\030" +
"time\030\007 \001(\004\022\027\n\017report_end_time\030\010 \001(\004\022\030\n\020i" + "\006 \003(\0132\014.Coprocessor\022\031\n\021report_start_time" +
"nfo_server_port\030\t \001(\r\"O\n\016LiveServerInfo\022", "\030\007 \001(\004\022\027\n\017report_end_time\030\010 \001(\004\022\030\n\020info_",
"\033\n\006server\030\001 \002(\0132\013.ServerName\022 \n\013server_l" + "server_port\030\t \001(\r\"O\n\016LiveServerInfo\022\033\n\006s" +
"oad\030\002 \002(\0132\013.ServerLoad\"\340\002\n\rClusterStatus" + "erver\030\001 \002(\0132\013.ServerName\022 \n\013server_load\030" +
"\022/\n\rhbase_version\030\001 \001(\0132\030.HBaseVersionFi" + "\002 \002(\0132\013.ServerLoad\"\340\002\n\rClusterStatus\022/\n\r" +
"leContent\022%\n\014live_servers\030\002 \003(\0132\017.LiveSe" + "hbase_version\030\001 \001(\0132\030.HBaseVersionFileCo" +
"rverInfo\022!\n\014dead_servers\030\003 \003(\0132\013.ServerN" + "ntent\022%\n\014live_servers\030\002 \003(\0132\017.LiveServer" +
"ame\0222\n\025regions_in_transition\030\004 \003(\0132\023.Reg" + "Info\022!\n\014dead_servers\030\003 \003(\0132\013.ServerName\022" +
"ionInTransition\022\036\n\ncluster_id\030\005 \001(\0132\n.Cl" + "2\n\025regions_in_transition\030\004 \003(\0132\023.RegionI" +
"usterId\022)\n\023master_coprocessors\030\006 \003(\0132\014.C" + "nTransition\022\036\n\ncluster_id\030\005 \001(\0132\n.Cluste" +
"oprocessor\022\033\n\006master\030\007 \001(\0132\013.ServerName\022" + "rId\022)\n\023master_coprocessors\030\006 \003(\0132\014.Copro" +
"#\n\016backup_masters\030\010 \003(\0132\013.ServerName\022\023\n\013", "cessor\022\033\n\006master\030\007 \001(\0132\013.ServerName\022#\n\016b",
"balancer_on\030\t \001(\010BF\n*org.apache.hadoop.h" + "ackup_masters\030\010 \003(\0132\013.ServerName\022\023\n\013bala" +
"base.protobuf.generatedB\023ClusterStatusPr" + "ncer_on\030\t \001(\010BF\n*org.apache.hadoop.hbase" +
"otosH\001\240\001\001" ".protobuf.generatedB\023ClusterStatusProtos" +
"H\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -45,6 +45,12 @@ message RegionState {
FAILED_CLOSE = 10; // failed to close, and won't retry any more FAILED_CLOSE = 10; // failed to close, and won't retry any more
MERGING = 11; // server started merge a region MERGING = 11; // server started merge a region
MERGED = 12; // server completed merge of a region MERGED = 12; // server completed merge of a region
SPLITTING_NEW = 13; // new region to be created when RS splits a parent
// region but hasn't be created yet, or master doesn't
// know it's already created
MERGING_NEW = 14; // new region to be created when RS merges two
// daughter regions but hasn't be created yet, or
// master doesn't know it's already created
} }
} }

View File

@ -68,15 +68,16 @@ import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.MergedRegionHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
@ -147,6 +148,12 @@ public class AssignmentManager extends ZooKeeperListener {
*/ */
private final int maximumAttempts; private final int maximumAttempts;
/**
* Map of two merging regions from the region to be created.
*/
private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
= new HashMap<String, PairOfSameType<HRegionInfo>>();
/** /**
* The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
* failure due to lack of availability of region plan * failure due to lack of availability of region plan
@ -176,9 +183,6 @@ public class AssignmentManager extends ZooKeeperListener {
// For unit tests, keep track of calls to OpenedRegionHandler // For unit tests, keep track of calls to OpenedRegionHandler
private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null; private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
// For unit tests, keep track of calls to SplitRegionHandler
private AtomicBoolean splitRegionHandlerCalled = null;
//Thread pool executor service for timeout monitor //Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService; private java.util.concurrent.ExecutorService threadPoolExecutorService;
@ -225,6 +229,12 @@ public class AssignmentManager extends ZooKeeperListener {
private final ConcurrentHashMap<String, AtomicInteger> private final ConcurrentHashMap<String, AtomicInteger>
failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>(); failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
/**
* For testing only! Set to true to skip handling of split.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
public static boolean TEST_SKIP_SPLIT_HANDLING = false;
/** /**
* Constructs a new assignment manager. * Constructs a new assignment manager.
* *
@ -552,8 +562,17 @@ public class AssignmentManager extends ZooKeeperListener {
} }
HRegionInfo hri = regionInfo; HRegionInfo hri = regionInfo;
if (hri == null) { if (hri == null) {
// Get the region from region states map/meta. However, we
// may still can't get it, for example, for online region merge,
// the znode uses the new region to be created, which may not in meta
// yet if the merging is still going on during the master recovery.
hri = regionStates.getRegionInfo(rt.getRegionName()); hri = regionStates.getRegionInfo(rt.getRegionName());
if (hri == null) return false; EventType et = rt.getEventType();
if (hri == null && et != EventType.RS_ZK_REGION_MERGING
&& et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
LOG.warn("Couldn't find the region in recovering " + rt);
return false;
}
} }
processRegionsInTransition(rt, hri, stat.getVersion()); processRegionsInTransition(rt, hri, stat.getVersion());
return true; return true;
@ -575,12 +594,12 @@ public class AssignmentManager extends ZooKeeperListener {
EventType et = rt.getEventType(); EventType et = rt.getEventType();
// Get ServerName. Could not be null. // Get ServerName. Could not be null.
final ServerName sn = rt.getServerName(); final ServerName sn = rt.getServerName();
final String encodedRegionName = regionInfo.getEncodedName(); final byte[] regionName = rt.getRegionName();
final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedRegionName); final String encodedName = HRegionInfo.encodeRegionName(regionName);
LOG.info("Processing " + regionInfo.getRegionNameAsString() + " in state " + et); final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
LOG.info("Processing " + prettyPrintedRegionName + " in state " + et);
if (regionStates.isRegionInTransition(encodedName)) {
if (regionStates.isRegionInTransition(encodedRegionName)) {
// Just return // Just return
return; return;
} }
@ -637,7 +656,7 @@ public class AssignmentManager extends ZooKeeperListener {
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName()); ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
try { try {
RegionPlan plan = new RegionPlan(regionInfo, null, sn); RegionPlan plan = new RegionPlan(regionInfo, null, sn);
addPlan(encodedRegionName, plan); addPlan(encodedName, plan);
assign(rs, false, false); assign(rs, false, false);
} finally { } finally {
lock.unlock(); lock.unlock();
@ -666,69 +685,32 @@ public class AssignmentManager extends ZooKeeperListener {
new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process(); new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
} }
break; break;
case RS_ZK_REQUEST_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING: case RS_ZK_REGION_SPLITTING:
if (!serverManager.isServerOnline(sn)) {
// The regionserver started the split, but died before updating the status.
// It means (hopefully) that the split was not finished
// TBD - to study. In the meantime, do nothing as in the past.
LOG.warn("Processed region " + prettyPrintedRegionName + " in state : " + et +
" on a dead regionserver: " + sn + " doing nothing");
} else {
// Splitting region should be online. We could have skipped it during
// user region rebuilding since we may consider the split is completed.
// Put it in SPLITTING state to avoid complications.
regionStates.regionOnline(regionInfo, sn);
regionStates.updateRegionState(rt, State.SPLITTING);
LOG.info("Processed " + prettyPrintedRegionName + " in state : " + et);
}
break;
case RS_ZK_REGION_SPLIT: case RS_ZK_REGION_SPLIT:
if (!serverManager.isServerOnline(sn)) { if (serverManager.isServerOnline(sn)) {
// The region is already in SPLIT state, do nothing
LOG.warn("Processed " + prettyPrintedRegionName
+ " in state : " + et + " on a dead regionserver: " + sn
+ " doing nothing");
} else {
// Splitting region should be online. We could have skipped it during // Splitting region should be online. We could have skipped it during
// user region rebuilding since we may consider the split is completed. // user region rebuilding since we may consider the split is completed.
// Put it in SPLITTING state to avoid complications. // Put it in SPLITTING state to avoid complications.
regionStates.regionOnline(regionInfo, sn); regionStates.regionOnline(regionInfo, sn);
regionStates.updateRegionState(rt, State.SPLITTING); regionStates.updateRegionState(rt, State.SPLITTING);
LOG.info("Processed " + prettyPrintedRegionName + " in state : " + et);
// Move the region to splitting state. The regionserver is supposed to update the znode
// multiple times so if it's still up we will receive an update soon.
} }
break; if (!handleRegionSplitting(
case RS_ZK_REGION_MERGING: rt, encodedName, prettyPrintedRegionName, sn)) {
if (!serverManager.isServerOnline(sn)) { deleteSplittingNode(encodedName);
// The regionserver started the merge, but died before updating the status. }
// It means (hopefully) that the merge was not finished
// This node should be gone soon since it is ephemeral.
LOG.warn("Processed " + prettyPrintedRegionName + " in state : " + et +
" on a dead regionserver: " + sn + " doing nothing");
} else {
handleRegionMerging(rt, prettyPrintedRegionName, sn);
LOG.info("Processed region " + prettyPrintedRegionName LOG.info("Processed region " + prettyPrintedRegionName
+ " in state : " + et); + " in state : " + et);
}
break; break;
case RS_ZK_REQUEST_REGION_MERGE:
case RS_ZK_REGION_MERGING:
case RS_ZK_REGION_MERGED: case RS_ZK_REGION_MERGED:
if (!serverManager.isServerOnline(sn)) { if (!handleRegionMerging(
// Do nothing, merging regions are already removed from meta, rt, encodedName, prettyPrintedRegionName, sn)) {
// so they are not in region states map any more. deleteMergingNode(encodedName);
// The new region will be assigned by the ServerShutdownHandler
LOG.warn("Processed " + prettyPrintedRegionName
+ " in state : " + et + " on a dead regionserver: " + sn
+ " doing nothing");
} else {
// Merging regions are already removed from meta. It doesn't hurt to
// do nothing here, no need to set them to merging state here. We are fine
// to put the new region to online state during user region rebuilding.
LOG.info("Processed " + prettyPrintedRegionName + " in state : " +
et + " nothing to do.");
// We don't do anything. The regionserver is supposed to update the znode
// multiple times so if it's still up we will receive an update soon.
} }
LOG.info("Processed region " + prettyPrintedRegionName
+ " in state : " + et);
break; break;
default: default:
throw new IllegalStateException("Received region in state :" + et + " is not valid."); throw new IllegalStateException("Received region in state :" + et + " is not valid.");
@ -811,7 +793,7 @@ public class AssignmentManager extends ZooKeeperListener {
} }
RegionState regionState = RegionState regionState =
regionStates.getRegionTransitionState(encodedName); regionStates.getRegionState(encodedName);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
boolean lateEvent = createTime < (startTime - 15000); boolean lateEvent = createTime < (startTime - 15000);
@ -831,7 +813,7 @@ public class AssignmentManager extends ZooKeeperListener {
Lock lock = locker.acquireLock(encodedName); Lock lock = locker.acquireLock(encodedName);
try { try {
RegionState latestState = RegionState latestState =
regionStates.getRegionTransitionState(encodedName); regionStates.getRegionState(encodedName);
if ((regionState == null && latestState != null) if ((regionState == null && latestState != null)
|| (regionState != null && latestState == null) || (regionState != null && latestState == null)
|| (regionState != null && latestState != null || (regionState != null && latestState != null
@ -845,83 +827,24 @@ public class AssignmentManager extends ZooKeeperListener {
} }
regionState = latestState; regionState = latestState;
switch (rt.getEventType()) { switch (rt.getEventType()) {
case RS_ZK_REQUEST_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING: case RS_ZK_REGION_SPLITTING:
if (!isInStateForSplitting(regionState)) break;
regionStates.updateRegionState(rt, State.SPLITTING);
break;
case RS_ZK_REGION_SPLIT: case RS_ZK_REGION_SPLIT:
// RegionState must be null, or SPLITTING or PENDING_CLOSE. if (!handleRegionSplitting(
if (!isInStateForSplitting(regionState)) break; rt, encodedName, prettyPrintedRegionName, sn)) {
// If null, add SPLITTING state before going to SPLIT deleteSplittingNode(encodedName);
if (regionState == null) {
regionState = regionStates.updateRegionState(rt, State.SPLITTING);
String message = "Received SPLIT for region " + prettyPrintedRegionName +
" from server " + sn;
// If still null, it means we cannot find it and it was already processed
if (regionState == null) {
LOG.warn(message + " but it doesn't exist anymore," +
" probably already processed its split");
break;
} }
LOG.info(message +
" but region was not first in SPLITTING state; continuing");
}
// Check it has daughters.
byte [] payload = rt.getPayload();
List<HRegionInfo> daughters;
try {
daughters = HRegionInfo.parseDelimitedFrom(payload, 0, payload.length);
} catch (IOException e) {
LOG.error("Dropped split! Failed reading split payload for " +
prettyPrintedRegionName);
break;
}
assert daughters.size() == 2;
// Assert that we can get a serverinfo for this server.
if (!this.serverManager.isServerOnline(sn)) {
LOG.error("Dropped split! ServerName=" + sn + " unknown.");
break;
}
// Run handler to do the rest of the SPLIT handling.
new SplitRegionHandler(server, this, regionState.getRegion(), sn, daughters).process();
updateSplitHandlerTracker();
break; break;
case RS_ZK_REQUEST_REGION_MERGE:
case RS_ZK_REGION_MERGING: case RS_ZK_REGION_MERGING:
case RS_ZK_REGION_MERGED:
// Merged region is a new region, we can't find it in the region states now. // Merged region is a new region, we can't find it in the region states now.
// However, the two merging regions are not new. They should be in state for merging. // However, the two merging regions are not new. They should be in state for merging.
handleRegionMerging(rt, prettyPrintedRegionName, sn); if (!handleRegionMerging(
break; rt, encodedName, prettyPrintedRegionName, sn)) {
deleteMergingNode(encodedName);
case RS_ZK_REGION_MERGED:
// Assert that we can get a serverinfo for this server.
if (!this.serverManager.isServerOnline(sn)) {
LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
break;
} }
// Get merged and merging regions.
byte[] payloadOfMerge = rt.getPayload();
List<HRegionInfo> mergeRegions;
try {
mergeRegions = HRegionInfo.parseDelimitedFrom(payloadOfMerge, 0,
payloadOfMerge.length);
} catch (IOException e) {
LOG.error("Dropped merge! Failed reading merge payload for " +
prettyPrintedRegionName);
break;
}
assert mergeRegions.size() == 3;
HRegionInfo merge_a = mergeRegions.get(1);
HRegionInfo merge_b = mergeRegions.get(2);
if (!isInStateForMerging(sn, merge_a, merge_b)) {
// Move on. Merge already happened (passed PONR), no point to stop now
LOG.warn("Got merge event, but not in state good for MERGED; rs_a="
+ merge_a + ", rs_b=" + merge_b);
}
// Run handler to do the rest of the MERGED handling.
new MergedRegionHandler(server, this, sn, mergeRegions).process();
break; break;
case M_ZK_REGION_CLOSING: case M_ZK_REGION_CLOSING:
@ -1055,19 +978,10 @@ public class AssignmentManager extends ZooKeeperListener {
return b == null ? false : b.compareAndSet(true, false); return b == null ? false : b.compareAndSet(true, false);
} }
//For unit tests only
boolean wasSplitHandlerCalled() {
//compareAndSet to be sure that unit tests don't see stale values. Means,
//we will return true exactly once unless the handler code resets to true
//this value.
return splitRegionHandlerCalled.compareAndSet(true, false);
}
//For unit tests only //For unit tests only
void initializeHandlerTrackers() { void initializeHandlerTrackers() {
closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>(); closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>(); openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
splitRegionHandlerCalled = new AtomicBoolean(false);
} }
void updateClosedRegionHandlerTracker(HRegionInfo hri) { void updateClosedRegionHandlerTracker(HRegionInfo hri) {
@ -1082,36 +996,6 @@ public class AssignmentManager extends ZooKeeperListener {
} }
} }
void updateSplitHandlerTracker() {
if (splitRegionHandlerCalled != null) { //only for unit tests this is true
splitRegionHandlerCalled.set(true);
}
}
/**
* @return Returns true if this RegionState is splittable; i.e. the
* RegionState is currently in splitting state or pending_close or
* null (Anything else will return false). (Anything else will return false).
*/
private boolean isInStateForSplitting(final RegionState rs) {
if (rs == null) return true;
if (rs.isSplitting()) return true;
if (convertPendingCloseToSplitting(rs)) return true;
LOG.warn("Dropped region split! Not in state good for SPLITTING; rs=" + rs);
return false;
}
/**
* @return Returns true if both regions are merging/open on specified server
*/
private boolean isInStateForMerging(final ServerName sn,
final HRegionInfo a, final HRegionInfo b) {
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
return ((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
&& (rs_b == null || rs_b.isOpenOrMergingOnServer(sn)));
}
// TODO: processFavoredNodes might throw an exception, for e.g., if the // TODO: processFavoredNodes might throw an exception, for e.g., if the
// meta could not be contacted/updated. We need to see how seriously to treat // meta could not be contacted/updated. We need to see how seriously to treat
// this problem as. Should we fail the current assignment. We should be able // this problem as. Should we fail the current assignment. We should be able
@ -1130,25 +1014,6 @@ public class AssignmentManager extends ZooKeeperListener {
FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker); FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
} }
/**
* If the passed regionState is in PENDING_CLOSE, clean up PENDING_CLOSE
* state and convert it to SPLITTING instead.
* This can happen in case where master wants to close a region at same time
* a regionserver starts a split. The split won. Clean out old PENDING_CLOSE
* state.
* @param rs
* @return True if we converted from PENDING_CLOSE to SPLITTING
*/
private boolean convertPendingCloseToSplitting(final RegionState rs) {
if (!rs.isPendingClose()) return false;
LOG.debug("Converting PENDING_CLOSE to SPLITTING; rs=" + rs);
regionStates.updateRegionState(rs.getRegion(), State.SPLITTING);
// Clean up existing state. Clear from region plans seems all we
// have to do here by way of clean up of PENDING_CLOSE.
clearRegionPlan(rs.getRegion());
return true;
}
/** /**
* Handle a ZK unassigned node transition triggered by HBCK repair tool. * Handle a ZK unassigned node transition triggered by HBCK repair tool.
* <p> * <p>
@ -1305,38 +1170,56 @@ public class AssignmentManager extends ZooKeeperListener {
Lock lock = locker.acquireLock(regionName); Lock lock = locker.acquireLock(regionName);
try { try {
RegionState rs = regionStates.getRegionTransitionState(regionName); RegionState rs = regionStates.getRegionTransitionState(regionName);
if (rs == null) return; if (rs == null) {
rs = regionStates.getRegionState(regionName);
if (rs == null || !rs.isMergingNew()) {
// MergingNew is an offline state
return;
}
}
HRegionInfo regionInfo = rs.getRegion(); HRegionInfo regionInfo = rs.getRegion();
String regionNameStr = regionInfo.getRegionNameAsString(); String regionNameStr = regionInfo.getRegionNameAsString();
LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs); LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
if (rs.isOpened()) {
ServerName serverName = rs.getServerName();
regionOnline(regionInfo, serverName);
boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable()); boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable());
if (!serverManager.isServerOnline(serverName) && !disabled) { ServerName serverName = rs.getServerName();
LOG.info("Opened " + regionNameStr if (serverManager.isServerOnline(serverName)) {
+ "but the region server is offline, reassign the region"); if (rs.isOnServer(serverName)
assign(regionInfo, true); && (rs.isOpened() || rs.isSplitting())) {
} else if (disabled) { regionOnline(regionInfo, serverName);
if (disabled) {
// if server is offline, no hurt to unassign again // if server is offline, no hurt to unassign again
LOG.info("Opened " + regionNameStr LOG.info("Opened " + regionNameStr
+ "but this table is disabled, triggering close of region"); + "but this table is disabled, triggering close of region");
unassign(regionInfo); unassign(regionInfo);
} }
} else if (rs.isSplitting()) { } else if (rs.isMergingNew()) {
LOG.debug("Ephemeral node deleted. Found in SPLITTING state. " + "Removing from RIT " synchronized (regionStates) {
+ rs.getRegion()); String p = regionInfo.getEncodedName();
// it can be either SPLIT fail, or RS dead. PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
regionStates.regionOnline(rs.getRegion(), rs.getServerName()); if (regions != null) {
onlineMergingRegion(disabled, regions.getFirst(), serverName);
onlineMergingRegion(disabled, regions.getSecond(), serverName);
}
}
}
} }
// RS does not delete the znode in case SPLIT, it only means RS died which
// will be handled by SSH
// in region merge we do not put merging regions to MERGING state
} finally { } finally {
lock.unlock(); lock.unlock();
} }
} }
private void onlineMergingRegion(boolean disabled,
final HRegionInfo hri, final ServerName serverName) {
RegionState regionState = regionStates.getRegionState(hri);
if (regionState != null && regionState.isMerging()
&& regionState.isOnServer(serverName)) {
regionOnline(regionState.getRegion(), serverName);
if (disabled) {
unassign(hri);
}
}
}
}); });
} }
} }
@ -1371,23 +1254,7 @@ public class AssignmentManager extends ZooKeeperListener {
// on it, so no need to watch it again. So, as I know for now, // on it, so no need to watch it again. So, as I know for now,
// this is needed to watch splitting nodes only. // this is needed to watch splitting nodes only.
if (!regionStates.isRegionInTransition(child)) { if (!regionStates.isRegionInTransition(child)) {
stat.setVersion(0); ZKAssign.getDataAndWatch(watcher, child, stat);
byte[] data = ZKAssign.getDataAndWatch(watcher,
ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
if (data != null && stat.getVersion() > 0) {
try {
RegionTransition rt = RegionTransition.parseFrom(data);
//See HBASE-7551, handle splitting too, in case we miss the node change event
EventType type = rt.getEventType();
if (type == EventType.RS_ZK_REGION_SPLITTING
|| type == EventType.RS_ZK_REGION_MERGING) {
handleRegion(rt, stat.getVersion());
}
} catch (DeserializationException de) {
LOG.error("error getting data for " + child, de);
}
}
} }
} }
} }
@ -2374,7 +2241,8 @@ public class AssignmentManager extends ZooKeeperListener {
*/ */
public void unassign(HRegionInfo region, boolean force, ServerName dest) { public void unassign(HRegionInfo region, boolean force, ServerName dest) {
// TODO: Method needs refactoring. Ugly buried returns throughout. Beware! // TODO: Method needs refactoring. Ugly buried returns throughout. Beware!
LOG.debug("Starting unassign of " + region.getRegionNameAsString() + " (offlining)"); LOG.debug("Starting unassign of " + region.getRegionNameAsString()
+ " (offlining), current state: " + regionStates.getRegionState(region));
String encodedName = region.getEncodedName(); String encodedName = region.getEncodedName();
// Grab the state of this region and synchronize on it // Grab the state of this region and synchronize on it
@ -2389,8 +2257,7 @@ public class AssignmentManager extends ZooKeeperListener {
// Region is not in transition. // Region is not in transition.
// We can unassign it only if it's not SPLIT/MERGED. // We can unassign it only if it's not SPLIT/MERGED.
state = regionStates.getRegionState(encodedName); state = regionStates.getRegionState(encodedName);
if (state != null && (state.isMerged() if (state != null && state.isNotUnassignableNotInTransition()) {
|| state.isSplit() || state.isOffline())) {
LOG.info("Attempting to unassign " + state + ", ignored"); LOG.info("Attempting to unassign " + state + ", ignored");
// Offline region will be reassigned below // Offline region will be reassigned below
return; return;
@ -2484,27 +2351,9 @@ public class AssignmentManager extends ZooKeeperListener {
* @param region regioninfo of znode to be deleted. * @param region regioninfo of znode to be deleted.
*/ */
public void deleteClosingOrClosedNode(HRegionInfo region) { public void deleteClosingOrClosedNode(HRegionInfo region) {
String encodedName = region.getEncodedName(); String regionName = region.getEncodedName();
try { deleteNodeInStates(regionName, "closing", EventType.M_ZK_REGION_CLOSING,
if (!ZKAssign.deleteNode(watcher, encodedName, EventType.RS_ZK_REGION_CLOSED);
EventType.M_ZK_REGION_CLOSING)) {
boolean deleteNode = ZKAssign.deleteNode(watcher,
encodedName, EventType.RS_ZK_REGION_CLOSED);
// TODO : We don't abort if the delete node returns false. Is there any
// such corner case?
if (!deleteNode) {
LOG.error("The deletion of the CLOSED node for "
+ encodedName + " returned " + deleteNode);
}
}
} catch (NoNodeException e) {
LOG.debug("CLOSING/CLOSED node for " + encodedName
+ " already deleted");
} catch (KeeperException ke) {
server.abort(
"Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
+ encodedName, ke);
}
} }
/** /**
@ -2519,16 +2368,22 @@ public class AssignmentManager extends ZooKeeperListener {
// This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
// cleaned up before we can get data from it. // cleaned up before we can get data from it.
byte [] data = ZKAssign.getData(watcher, path); byte [] data = ZKAssign.getData(watcher, path);
if (data == null) return false; if (data == null) {
LOG.info("Node " + path + " is gone");
return false;
}
RegionTransition rt = RegionTransition.parseFrom(data); RegionTransition rt = RegionTransition.parseFrom(data);
switch (rt.getEventType()) { switch (rt.getEventType()) {
case RS_ZK_REQUEST_REGION_SPLIT:
case RS_ZK_REGION_SPLIT: case RS_ZK_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING: case RS_ZK_REGION_SPLITTING:
case RS_ZK_REQUEST_REGION_MERGE:
case RS_ZK_REGION_MERGED: case RS_ZK_REGION_MERGED:
case RS_ZK_REGION_MERGING: case RS_ZK_REGION_MERGING:
result = true; result = true;
break; break;
default: default:
LOG.info("Node " + path + " is in " + rt.getEventType());
break; break;
} }
return result; return result;
@ -2818,6 +2673,7 @@ public class AssignmentManager extends ZooKeeperListener {
// Region is being served and on an active server // Region is being served and on an active server
// add only if region not in disabled or enabling table // add only if region not in disabled or enabling table
if (!disabledOrEnablingTables.contains(tableName)) { if (!disabledOrEnablingTables.contains(tableName)) {
regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
regionStates.regionOnline(regionInfo, regionLocation); regionStates.regionOnline(regionInfo, regionLocation);
} }
// need to enable the table if not disabled or disabling or enabling // need to enable the table if not disabled or disabling or enabling
@ -3275,8 +3131,9 @@ public class AssignmentManager extends ZooKeeperListener {
server.abort("Unexpected ZK exception deleting node " + hri, ke); server.abort("Unexpected ZK exception deleting node " + hri, ke);
} }
if (zkTable.isDisablingOrDisabledTable(hri.getTable())) { if (zkTable.isDisablingOrDisabledTable(hri.getTable())) {
it.remove(); regionStates.updateRegionState(hri, State.OFFLINE);
regionStates.regionOffline(hri); regionStates.regionOffline(hri);
it.remove();
continue; continue;
} }
// Mark the region offline and assign it again by SSH // Mark the region offline and assign it again by SSH
@ -3289,38 +3146,6 @@ public class AssignmentManager extends ZooKeeperListener {
return regions; return regions;
} }
/**
* Update inmemory structures.
* @param sn Server that reported the split
* @param parent Parent region that was split
* @param a Daughter region A
* @param b Daughter region B
*/
public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
final HRegionInfo a, final HRegionInfo b) {
synchronized (regionStates) {
regionOffline(parent, State.SPLIT);
onlineNewRegion(a, sn);
onlineNewRegion(b, sn);
}
}
/**
* Update inmemory structures.
* @param sn Server that reported the merge
* @param merged regioninfo of merged
* @param a region a
* @param b region b
*/
public void handleRegionsMergeReport(final ServerName sn,
final HRegionInfo merged, final HRegionInfo a, final HRegionInfo b) {
synchronized (regionStates) {
regionOffline(a, State.MERGED);
regionOffline(b, State.MERGED);
onlineNewRegion(merged, sn);
}
}
/** /**
* @param plan Plan to execute. * @param plan Plan to execute.
*/ */
@ -3397,33 +3222,283 @@ public class AssignmentManager extends ZooKeeperListener {
return true; return true;
} }
private boolean deleteNodeInStates(
String regionName, String desc, EventType... types) {
try {
for (EventType et: types) {
if (ZKAssign.deleteNode(watcher, regionName, et)) {
return true;
}
}
LOG.info("Failed to delete the " + desc + " node for "
+ regionName + ". The node type may not match");
} catch (NoNodeException e) {
LOG.debug("The " + desc + " node for " + regionName + " already deleted");
} catch (KeeperException ke) {
server.abort("Unexpected ZK exception deleting " + desc
+ " node for the region " + regionName, ke);
}
return false;
}
private void deleteMergingNode(String encodedName) {
deleteNodeInStates(encodedName, "merging", EventType.RS_ZK_REGION_MERGING,
EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
}
private void deleteSplittingNode(String encodedName) {
deleteNodeInStates(encodedName, "splitting", EventType.RS_ZK_REGION_SPLITTING,
EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
}
/** /**
* A helper to handle region merging transition event. * A helper to handle region merging transition event.
* It transitions merging regions to MERGING state. * It transitions merging regions to MERGING state.
*/ */
private boolean handleRegionMerging(final RegionTransition rt, private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
final String prettyPrintedRegionName, final ServerName sn) { final String prettyPrintedRegionName, final ServerName sn) {
if (!serverManager.isServerOnline(sn)) {
LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
return false;
}
byte [] payloadOfMerging = rt.getPayload(); byte [] payloadOfMerging = rt.getPayload();
List<HRegionInfo> mergingRegions; List<HRegionInfo> mergingRegions;
try { try {
mergingRegions = HRegionInfo.parseDelimitedFrom( mergingRegions = HRegionInfo.parseDelimitedFrom(
payloadOfMerging, 0, payloadOfMerging.length); payloadOfMerging, 0, payloadOfMerging.length);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Dropped merging! Failed reading merging payload for " LOG.error("Dropped merging! Failed reading " + rt.getEventType()
+ prettyPrintedRegionName); + " payload for " + prettyPrintedRegionName);
return false; return false;
} }
assert mergingRegions.size() == 2; assert mergingRegions.size() == 3;
HRegionInfo merging_a = mergingRegions.get(0); HRegionInfo p = mergingRegions.get(0);
HRegionInfo merging_b = mergingRegions.get(1); HRegionInfo hri_a = mergingRegions.get(1);
HRegionInfo hri_b = mergingRegions.get(2);
if (!isInStateForMerging(sn, merging_a, merging_b)) { RegionState rs_p = regionStates.getRegionState(p);
LOG.warn("Dropped merging! Not in state good for MERGING; rs_a=" RegionState rs_a = regionStates.getRegionState(hri_a);
+ merging_a + ", rs_b=" + merging_b); RegionState rs_b = regionStates.getRegionState(hri_b);
if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
&& (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
&& (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
+ rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
return false; return false;
} }
regionStates.updateRegionState(merging_a, State.MERGING);
regionStates.updateRegionState(merging_b, State.MERGING); EventType et = rt.getEventType();
if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
try {
if (RegionMergeTransaction.transitionMergingNode(watcher, p,
hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
EventType.RS_ZK_REGION_MERGING) == -1) {
byte[] data = ZKAssign.getData(watcher, encodedName);
EventType currentType = null;
if (data != null) {
RegionTransition newRt = RegionTransition.parseFrom(data);
currentType = newRt.getEventType();
}
if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
&& currentType != EventType.RS_ZK_REGION_MERGING)) {
LOG.warn("Failed to transition pending_merge node "
+ encodedName + " to merging, it's now " + currentType);
return false;
}
}
} catch (Exception e) {
LOG.warn("Failed to transition pending_merge node "
+ encodedName + " to merging", e);
return false;
}
}
synchronized (regionStates) {
if (regionStates.getRegionState(p) == null) {
regionStates.createRegionState(p);
}
regionStates.updateRegionState(hri_a, State.MERGING);
regionStates.updateRegionState(hri_b, State.MERGING);
if (et != EventType.RS_ZK_REGION_MERGED) {
regionStates.updateRegionState(p, State.MERGING_NEW, sn);;
regionStates.regionOffline(p, State.MERGING_NEW);
this.mergingRegions.put(encodedName,
new PairOfSameType<HRegionInfo>(hri_a, hri_b));
} else {
this.mergingRegions.remove(encodedName);
regionStates.updateRegionState(hri_a, State.MERGED);
regionStates.updateRegionState(hri_b, State.MERGED);
regionOffline(hri_a, State.MERGED);
regionOffline(hri_b, State.MERGED);
regionOnline(p, sn);
}
}
if (et == EventType.RS_ZK_REGION_MERGED) {
LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
// Remove region from ZK
try {
boolean successful = false;
while (!successful) {
// It's possible that the RS tickles in between the reading of the
// znode and the deleting, so it's safe to retry.
successful = ZKAssign.deleteNode(
watcher, encodedName, EventType.RS_ZK_REGION_MERGED);
}
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
} else {
server.abort("Error deleting MERGED node " + encodedName, e);
}
}
LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
+ ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
+ hri_b.getRegionNameAsString() + ", on " + sn);
// User could disable the table before master knows the new region.
if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
unassign(p);
}
}
return true;
}
/**
* A helper to handle region splitting transition event.
*/
private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
final String prettyPrintedRegionName, final ServerName sn) {
if (!serverManager.isServerOnline(sn)) {
LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
return false;
}
byte [] payloadOfSplitting = rt.getPayload();
List<HRegionInfo> splittingRegions;
try {
splittingRegions = HRegionInfo.parseDelimitedFrom(
payloadOfSplitting, 0, payloadOfSplitting.length);
} catch (IOException e) {
LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
+ " payload for " + prettyPrintedRegionName);
return false;
}
assert splittingRegions.size() == 2;
HRegionInfo hri_a = splittingRegions.get(0);
HRegionInfo hri_b = splittingRegions.get(1);
RegionState rs_p = regionStates.getRegionState(encodedName);
RegionState rs_a = regionStates.getRegionState(hri_a);
RegionState rs_b = regionStates.getRegionState(hri_b);
if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
&& (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
&& (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
+ rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
return false;
}
if (rs_p == null) {
// Splitting region should be online
rs_p = regionStates.updateRegionState(rt, State.OPEN);
if (rs_p == null) {
LOG.warn("Received splitting for region " + prettyPrintedRegionName
+ " from server " + sn + " but it doesn't exist anymore,"
+ " probably already processed its split");
return false;
}
regionStates.regionOnline(rs_p.getRegion(), sn);
}
HRegionInfo p = rs_p.getRegion();
EventType et = rt.getEventType();
if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
try {
if (SplitTransaction.transitionSplittingNode(watcher, p,
hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
EventType.RS_ZK_REGION_SPLITTING) == -1) {
byte[] data = ZKAssign.getData(watcher, encodedName);
EventType currentType = null;
if (data != null) {
RegionTransition newRt = RegionTransition.parseFrom(data);
currentType = newRt.getEventType();
}
if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
&& currentType != EventType.RS_ZK_REGION_SPLITTING)) {
LOG.warn("Failed to transition pending_split node "
+ encodedName + " to splitting, it's now " + currentType);
return false;
}
}
} catch (Exception e) {
LOG.warn("Failed to transition pending_split node "
+ encodedName + " to splitting", e);
return false;
}
}
synchronized (regionStates) {
if (regionStates.getRegionState(hri_a) == null) {
regionStates.createRegionState(hri_a);
}
if (regionStates.getRegionState(hri_b) == null) {
regionStates.createRegionState(hri_b);
}
regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
regionStates.regionOffline(hri_a, State.SPLITTING_NEW);
regionStates.regionOffline(hri_b, State.SPLITTING_NEW);
regionStates.updateRegionState(rt, State.SPLITTING);
// The below is for testing ONLY! We can't do fault injection easily, so
// resort to this kinda uglyness -- St.Ack 02/25/2011.
if (TEST_SKIP_SPLIT_HANDLING) {
LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
return true; // return true so that the splitting node stays
}
if (et == EventType.RS_ZK_REGION_SPLIT) {
regionStates.updateRegionState(p, State.SPLIT);
regionOffline(p, State.SPLIT);
regionOnline(hri_a, sn);
regionOnline(hri_b, sn);
}
}
if (et == EventType.RS_ZK_REGION_SPLIT) {
LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
// Remove region from ZK
try {
boolean successful = false;
while (!successful) {
// It's possible that the RS tickles in between the reading of the
// znode and the deleting, so it's safe to retry.
successful = ZKAssign.deleteNode(
watcher, encodedName, EventType.RS_ZK_REGION_SPLIT);
}
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
} else {
server.abort("Error deleting SPLIT node " + encodedName, e);
}
}
LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
+ ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
+ hri_b.getRegionNameAsString() + ", on " + sn);
// User could disable the table before master knows the new region.
if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
unassign(hri_a);
unassign(hri_b);
}
}
return true; return true;
} }
@ -3438,21 +3513,4 @@ public class AssignmentManager extends ZooKeeperListener {
// remove the region plan as well just in case. // remove the region plan as well just in case.
clearRegionPlan(regionInfo); clearRegionPlan(regionInfo);
} }
/**
* Online a newly created region, which is usually from split/merge.
*/
private void onlineNewRegion(final HRegionInfo region, final ServerName sn) {
synchronized (regionStates) {
// Someone could find the region from meta and reassign it.
if (regionStates.getRegionState(region) == null) {
regionStates.createRegionState(region);
regionOnline(region, sn);
}
}
// User could disable the table before master knows the new region.
if (zkTable.isDisablingOrDisabledTable(region.getTable())) {
unassign(region);
}
}
} }

View File

@ -190,7 +190,15 @@ public class RegionStates {
*/ */
public synchronized boolean isRegionInState( public synchronized boolean isRegionInState(
final HRegionInfo hri, final State... states) { final HRegionInfo hri, final State... states) {
RegionState regionState = getRegionState(hri); return isRegionInState(hri.getEncodedName(), states);
}
/**
* @return True if specified region is in one of the specified states.
*/
public synchronized boolean isRegionInState(
final String regionName, final State... states) {
RegionState regionState = getRegionState(regionName);
State s = regionState != null ? regionState.getState() : null; State s = regionState != null ? regionState.getState() : null;
for (State state: states) { for (State state: states) {
if (s == state) return true; if (s == state) return true;
@ -358,11 +366,11 @@ public class RegionStates {
if (oldState == null) { if (oldState == null) {
LOG.warn("Online region not in RegionStates: " + hri.getShortNameToLog()); LOG.warn("Online region not in RegionStates: " + hri.getShortNameToLog());
} else { } else {
State state = oldState.getState();
ServerName sn = oldState.getServerName(); ServerName sn = oldState.getServerName();
if (state != State.OPEN || sn == null || !sn.equals(serverName)) { if (!oldState.isReadyToOnline() || sn == null || !sn.equals(serverName)) {
LOG.debug("Online " + hri.getShortNameToLog() + " with current state=" + state + LOG.debug("Online " + hri.getShortNameToLog() + " with current state="
", expected state=OPEN" + ", assigned to server: " + sn + " expected " + serverName); + oldState.getState() + ", expected state=OPEN/MERGING_NEW/SPLITTING_NEW"
+ ", assigned to server: " + sn + " expected " + serverName);
} }
} }
updateRegionState(hri, State.OPEN, serverName); updateRegionState(hri, State.OPEN, serverName);
@ -434,29 +442,28 @@ public class RegionStates {
} }
/** /**
* A region is offline, won't be in transition any more. * A region is offline, won't be in transition any more. Its state
* Its state should be the specified expected state, which * should be the specified expected state, which can only be
* can be Split/Merged/Offline/null(=Offline) only. * Split/Merged/Offline/null(=Offline)/SplittingNew/MergingNew.
*/ */
public synchronized void regionOffline( public synchronized void regionOffline(
final HRegionInfo hri, final State expectedState) { final HRegionInfo hri, final State expectedState) {
Preconditions.checkArgument(expectedState == null Preconditions.checkArgument(expectedState == null
|| expectedState == State.OFFLINE || expectedState == State.SPLIT || RegionState.isNotUnassignableNotInTransition(expectedState),
|| expectedState == State.MERGED, "Offlined region should be in state" "Offlined region should be in state OFFLINE/SPLIT/MERGED/"
+ " OFFLINE/SPLIT/MERGED instead of " + expectedState); + "SPLITTING_NEW/MERGING_NEW instead of " + expectedState);
String regionName = hri.getEncodedName(); String regionName = hri.getEncodedName();
RegionState oldState = regionStates.get(regionName); RegionState oldState = regionStates.get(regionName);
if (oldState == null) { if (oldState == null) {
LOG.warn("Offline region not in RegionStates: " + hri.getShortNameToLog()); LOG.warn("Offline region not in RegionStates: " + hri.getShortNameToLog());
} else if (LOG.isDebugEnabled()) { } else if (LOG.isDebugEnabled()) {
State state = oldState.getState();
ServerName sn = oldState.getServerName(); ServerName sn = oldState.getServerName();
if (state != State.OFFLINE if (!oldState.isReadyToOffline()) {
&& state != State.SPLITTING && state != State.MERGING) {
LOG.debug("Offline " + hri.getShortNameToLog() + " with current state=" LOG.debug("Offline " + hri.getShortNameToLog() + " with current state="
+ state + ", expected state=OFFLINE/SPLITTING/MERGING"); + oldState.getState() + ", expected state=OFFLINE/SPLIT/"
+ "MERGED/SPLITTING_NEW/MERGING_NEW");
} }
if (sn != null && state == State.OFFLINE) { if (sn != null && oldState.isOffline()) {
LOG.debug("Offline " + hri.getShortNameToLog() LOG.debug("Offline " + hri.getShortNameToLog()
+ " with current state=OFFLINE, assigned to server: " + " with current state=OFFLINE, assigned to server: "
+ sn + ", expected null"); + sn + ", expected null");
@ -497,9 +504,8 @@ public class RegionStates {
if (isRegionOnline(region)) { if (isRegionOnline(region)) {
regionsToOffline.add(region); regionsToOffline.add(region);
} else { } else {
RegionState state = getRegionState(region); if (isRegionInState(region, State.SPLITTING, State.MERGING)) {
if (state.isSplitting() || state.isMerging()) { LOG.debug("Offline splitting/merging region " + getRegionState(region));
LOG.debug("Offline splitting/merging region " + state);
try { try {
// Delete the ZNode if exists // Delete the ZNode if exists
ZKAssign.deleteNodeFailSilent(watcher, region); ZKAssign.deleteNodeFailSilent(watcher, region);
@ -512,6 +518,7 @@ public class RegionStates {
} }
for (HRegionInfo hri : regionsToOffline) { for (HRegionInfo hri : regionsToOffline) {
updateRegionState(hri, State.OFFLINE);
regionOffline(hri); regionOffline(hri);
} }

View File

@ -1,117 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
* Handles MERGED regions event on Master, master receive the merge report from
* the regionserver, then offline the merging regions and online the merged
* region.Here region_a sorts before region_b.
*/
@InterfaceAudience.Private
public class MergedRegionHandler extends EventHandler implements
TotesHRegionInfo {
private static final Log LOG = LogFactory.getLog(MergedRegionHandler.class);
private final AssignmentManager assignmentManager;
private final HRegionInfo merged;
private final HRegionInfo region_a;
private final HRegionInfo region_b;
private final ServerName sn;
public MergedRegionHandler(Server server,
AssignmentManager assignmentManager, ServerName sn,
final List<HRegionInfo> mergeRegions) {
super(server, EventType.RS_ZK_REGION_MERGED);
assert mergeRegions.size() == 3;
this.assignmentManager = assignmentManager;
this.merged = mergeRegions.get(0);
this.region_a = mergeRegions.get(1);
this.region_b = mergeRegions.get(2);
this.sn = sn;
}
@Override
public HRegionInfo getHRegionInfo() {
return this.merged;
}
@Override
public String toString() {
String name = "UnknownServerName";
if (server != null && server.getServerName() != null) {
name = server.getServerName().toString();
}
String mergedRegion = "UnknownRegion";
if (merged != null) {
mergedRegion = merged.getRegionNameAsString();
}
return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-"
+ mergedRegion;
}
@Override
public void process() {
String encodedRegionName = this.merged.getEncodedName();
LOG.debug("Handling MERGE event for " + encodedRegionName
+ "; deleting node");
this.assignmentManager.handleRegionsMergeReport(this.sn, this.merged,
this.region_a, this.region_b);
// Remove region from ZK
try {
boolean successful = false;
while (!successful) {
// It's possible that the RS tickles in between the reading of the
// znode and the deleting, so it's safe to retry.
successful = ZKAssign.deleteNode(this.server.getZooKeeper(),
encodedRegionName, EventType.RS_ZK_REGION_MERGED);
}
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
String znodePath = ZKUtil.joinZNode(
this.server.getZooKeeper().splitLogZNode, encodedRegionName);
LOG.debug("The znode " + znodePath
+ " does not exist. May be deleted already.");
} else {
server.abort("Error deleting MERGED node in ZK for transition ZK node ("
+ merged.getEncodedName() + ")", e);
}
}
LOG.info("Handled MERGED event; merged="
+ this.merged.getRegionNameAsString() + " region_a="
+ this.region_a.getRegionNameAsString() + "region_b="
+ this.region_b.getRegionNameAsString());
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.DeadServer; import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@ -246,10 +247,14 @@ public class ServerShutdownHandler extends EventHandler {
//clean zk node //clean zk node
LOG.info("Reassigning region with rs = " + rit + " and deleting zk node if exists"); LOG.info("Reassigning region with rs = " + rit + " and deleting zk node if exists");
ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), hri); ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), hri);
regionStates.updateRegionState(hri, State.OFFLINE);
} catch (KeeperException ke) { } catch (KeeperException ke) {
this.server.abort("Unexpected ZK exception deleting unassigned node " + hri, ke); this.server.abort("Unexpected ZK exception deleting unassigned node " + hri, ke);
return; return;
} }
} else if (regionStates.isRegionInState(
hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
regionStates.regionOffline(hri);
} }
toAssignRegions.add(hri); toAssignRegions.add(hri);
} else if (rit != null) { } else if (rit != null) {
@ -260,8 +265,9 @@ public class ServerShutdownHandler extends EventHandler {
// The rit that we use may be stale in case the table was in DISABLING state // The rit that we use may be stale in case the table was in DISABLING state
// but though we did assign we will not be clearing the znode in CLOSING state. // but though we did assign we will not be clearing the znode in CLOSING state.
// Doing this will have no harm. See HBASE-5927 // Doing this will have no harm. See HBASE-5927
regionStates.updateRegionState(hri, State.OFFLINE);
am.deleteClosingOrClosedNode(hri); am.deleteClosingOrClosedNode(hri);
am.regionOffline(hri); am.offlineDisabledRegion(hri);
} else { } else {
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition " LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
+ rit + " not to be assigned by SSH of server " + serverName); + rit + " not to be assigned by SSH of server " + serverName);

View File

@ -1,121 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
* Handles SPLIT region event on Master.
*/
@InterfaceAudience.Private
public class SplitRegionHandler extends EventHandler implements TotesHRegionInfo {
private static final Log LOG = LogFactory.getLog(SplitRegionHandler.class);
private final AssignmentManager assignmentManager;
private final HRegionInfo parent;
private final ServerName sn;
private final List<HRegionInfo> daughters;
/**
* For testing only! Set to true to skip handling of split.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
public static boolean TEST_SKIP = false;
public SplitRegionHandler(Server server,
AssignmentManager assignmentManager, HRegionInfo regionInfo,
ServerName sn, final List<HRegionInfo> daughters) {
super(server, EventType.RS_ZK_REGION_SPLIT);
this.assignmentManager = assignmentManager;
this.parent = regionInfo;
this.sn = sn;
this.daughters = daughters;
}
@Override
public HRegionInfo getHRegionInfo() {
return this.parent;
}
@Override
public String toString() {
String name = "UnknownServerName";
if(server != null && server.getServerName() != null) {
name = server.getServerName().toString();
}
String parentRegion = "UnknownRegion";
if(parent != null) {
parentRegion = parent.getRegionNameAsString();
}
return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" + parentRegion;
}
@Override
public void process() {
String encodedRegionName = this.parent.getEncodedName();
LOG.debug("Handling SPLIT event for " + encodedRegionName +
"; deleting node");
// The below is for testing ONLY! We can't do fault injection easily, so
// resort to this kinda uglyness -- St.Ack 02/25/2011.
if (TEST_SKIP) {
LOG.warn("Skipping split message, TEST_SKIP is set");
return;
}
this.assignmentManager.handleSplitReport(this.sn, this.parent,
this.daughters.get(0), this.daughters.get(1));
// Remove region from ZK
try {
boolean successful = false;
while (!successful) {
// It's possible that the RS tickles in between the reading of the
// znode and the deleting, so it's safe to retry.
successful = ZKAssign.deleteNode(this.server.getZooKeeper(),
encodedRegionName,
EventType.RS_ZK_REGION_SPLIT);
}
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
String znodePath = ZKUtil.joinZNode(
this.server.getZooKeeper().splitLogZNode, encodedRegionName);
LOG.debug("The znode " + znodePath
+ " does not exist. May be deleted already.");
} else {
server.abort("Error deleting SPLIT node in ZK for transition ZK node (" +
parent.getEncodedName() + ")", e);
}
}
LOG.info("Handled SPLIT event; parent=" +
this.parent.getRegionNameAsString() +
" daughter a=" + this.daughters.get(0).getRegionNameAsString() +
"daughter b=" + this.daughters.get(1).getRegionNameAsString());
}
}

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.DroppedSnapshotException;
@ -114,12 +113,9 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -2781,6 +2777,11 @@ public class HRegion implements HeapSize { // , Writable{
FileSystem fs = this.fs.getFileSystem(); FileSystem fs = this.fs.getFileSystem();
NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir); NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
}
if (files == null || files.isEmpty()) return seqid; if (files == null || files.isEmpty()) return seqid;
for (Path edits: files) { for (Path edits: files) {
@ -2794,10 +2795,12 @@ public class HRegion implements HeapSize { // , Writable{
String fileName = edits.getName(); String fileName = edits.getName();
maxSeqId = Math.abs(Long.parseLong(fileName)); maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqIdForTheRegion) { if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
String msg = "Maximum sequenceid for this log is " + maxSeqId String msg = "Maximum sequenceid for this log is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits; + ", skipped the whole file, path=" + edits;
LOG.debug(msg); LOG.debug(msg);
}
continue; continue;
} }

View File

@ -18,6 +18,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -45,6 +49,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
/** /**
* Executes region merge as a "transaction". It is similar with * Executes region merge as a "transaction". It is similar with
@ -261,25 +266,16 @@ public class RegionMergeTransaction {
createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo, createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo()); server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Failed creating MERGING znode on " throw new IOException("Failed creating PENDING_MERGE znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e); + this.mergedRegionInfo.getRegionNameAsString(), e);
} }
} }
this.journal.add(JournalEntry.SET_MERGING_IN_ZK); this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
if (server != null && server.getZooKeeper() != null) { if (server != null && server.getZooKeeper() != null) {
try { // After creating the merge node, wait for master to transition it
// Transition node from MERGING to MERGING after creating the merge // from PENDING_MERGE to MERGING so that we can move on. We want master
// node. Master will get the callback for node change only if the // knows about it and won't transition any region which is merging.
// transition is successful. znodeVersion = getZKNode(server, services);
// Note that if the transition fails then the rollback will delete the
// created znode as the journal entry SET_MERGING_IN_ZK is added.
this.znodeVersion = transitionNodeMerging(server.getZooKeeper(),
this.mergedRegionInfo, server.getServerName(), -1,
region_a.getRegionInfo(), region_b.getRegionInfo());
} catch (KeeperException e) {
throw new IOException("Failed setting MERGING znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
} }
this.region_a.getRegionFileSystem().createMergesDir(); this.region_a.getRegionFileSystem().createMergesDir();
@ -303,9 +299,10 @@ public class RegionMergeTransaction {
try { try {
// Do one more check on the merging znode (before it is too late) in case // Do one more check on the merging znode (before it is too late) in case
// any merging region is moved somehow. If so, the znode transition will fail. // any merging region is moved somehow. If so, the znode transition will fail.
this.znodeVersion = transitionNodeMerging(server.getZooKeeper(), this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
this.mergedRegionInfo, server.getServerName(), this.znodeVersion, this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
region_a.getRegionInfo(), region_b.getRegionInfo()); server.getServerName(), this.znodeVersion,
RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Failed setting MERGING znode on " throw new IOException("Failed setting MERGING znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e); + this.mergedRegionInfo.getRegionNameAsString(), e);
@ -489,9 +486,10 @@ public class RegionMergeTransaction {
// Tell master about merge by updating zk. If we fail, abort. // Tell master about merge by updating zk. If we fail, abort.
try { try {
this.znodeVersion = transitionNodeMerge(server.getZooKeeper(), this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
this.mergedRegionInfo, region_a.getRegionInfo(), this.mergedRegionInfo, region_a.getRegionInfo(),
region_b.getRegionInfo(), server.getServerName(), this.znodeVersion); region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
long startTime = EnvironmentEdgeManager.currentTimeMillis(); long startTime = EnvironmentEdgeManager.currentTimeMillis();
int spins = 0; int spins = 0;
@ -506,9 +504,10 @@ public class RegionMergeTransaction {
} }
Thread.sleep(100); Thread.sleep(100);
// When this returns -1 it means the znode doesn't exist // When this returns -1 it means the znode doesn't exist
this.znodeVersion = tickleNodeMerge(server.getZooKeeper(), this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
this.mergedRegionInfo, region_a.getRegionInfo(), this.mergedRegionInfo, region_a.getRegionInfo(),
region_b.getRegionInfo(), server.getServerName(), this.znodeVersion); region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
spins++; spins++;
} while (this.znodeVersion != -1 && !server.isStopped() } while (this.znodeVersion != -1 && !server.isStopped()
&& !services.isStopping()); && !services.isStopping());
@ -520,12 +519,83 @@ public class RegionMergeTransaction {
+ mergedRegionInfo.getEncodedName(), e); + mergedRegionInfo.getEncodedName(), e);
} }
// Leaving here, the mergedir with its dross will be in place but since the // Leaving here, the mergedir with its dross will be in place but since the
// merge was successful, just leave it; it'll be cleaned when region_a is // merge was successful, just leave it; it'll be cleaned when region_a is
// cleaned up by CatalogJanitor on master // cleaned up by CatalogJanitor on master
} }
/**
* Wait for the merging node to be transitioned from pending_merge
* to merging by master. That's how we are sure master has processed
* the event and is good with us to move on. If we don't get any update,
* we periodically transition the node so that master gets the callback.
* If the node is removed or is not in pending_merge state any more,
* we abort the merge.
*/
private int getZKNode(final Server server,
final RegionServerServices services) throws IOException {
// Wait for the master to process the pending_merge.
try {
int spins = 0;
Stat stat = new Stat();
ZooKeeperWatcher zkw = server.getZooKeeper();
ServerName expectedServer = server.getServerName();
String node = mergedRegionInfo.getEncodedName();
while (!(server.isStopped() || services.isStopping())) {
if (spins % 5 == 0) {
LOG.debug("Still waiting for master to process "
+ "the pending_merge for " + node);
transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(),
region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE,
RS_ZK_REQUEST_REGION_MERGE);
}
Thread.sleep(100);
spins++;
byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
if (data == null) {
throw new IOException("Data is null, merging node "
+ node + " no longer exists");
}
RegionTransition rt = RegionTransition.parseFrom(data);
EventType et = rt.getEventType();
if (et == RS_ZK_REGION_MERGING) {
ServerName serverName = rt.getServerName();
if (!serverName.equals(expectedServer)) {
throw new IOException("Merging node " + node + " is for "
+ serverName + ", not us " + expectedServer);
}
byte [] payloadOfMerging = rt.getPayload();
List<HRegionInfo> mergingRegions = HRegionInfo.parseDelimitedFrom(
payloadOfMerging, 0, payloadOfMerging.length);
assert mergingRegions.size() == 3;
HRegionInfo a = mergingRegions.get(1);
HRegionInfo b = mergingRegions.get(2);
HRegionInfo hri_a = region_a.getRegionInfo();
HRegionInfo hri_b = region_b.getRegionInfo();
if (!(hri_a.equals(a) && hri_b.equals(b))) {
throw new IOException("Merging node " + node + " is for " + a + ", "
+ b + ", not expected regions: " + hri_a + ", " + hri_b);
}
// Master has processed it.
return stat.getVersion();
}
if (et != RS_ZK_REQUEST_REGION_MERGE) {
throw new IOException("Merging node " + node
+ " moved out of merging to " + et);
}
}
// Server is stopping/stopped
throw new IOException("Server is "
+ (services.isStopping() ? "stopping" : "stopped"));
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException("Failed getting MERGING znode on "
+ mergedRegionInfo.getRegionNameAsString(), e);
}
}
/** /**
* Create reference file(s) of merging regions under the region_a merges dir * Create reference file(s) of merging regions under the region_a merges dir
* @param hstoreFilesOfRegionA * @param hstoreFilesOfRegionA
@ -566,6 +636,7 @@ public class RegionMergeTransaction {
* of no return and so now need to abort the server to minimize * of no return and so now need to abort the server to minimize
* damage. * damage.
*/ */
@SuppressWarnings("deprecation")
public boolean rollback(final Server server, public boolean rollback(final Server server,
final RegionServerServices services) throws IOException { final RegionServerServices services) throws IOException {
assert this.mergedRegionInfo != null; assert this.mergedRegionInfo != null;
@ -653,18 +724,20 @@ public class RegionMergeTransaction {
private static void cleanZK(final Server server, final HRegionInfo hri) { private static void cleanZK(final Server server, final HRegionInfo hri) {
try { try {
// Only delete if its in expected state; could have been hijacked. // Only delete if its in expected state; could have been hijacked.
if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
RS_ZK_REQUEST_REGION_MERGE)) {
ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
EventType.RS_ZK_REGION_MERGING); RS_ZK_REGION_MERGING);
}
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
LOG.warn("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); LOG.warn("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
} catch (KeeperException e) { } catch (KeeperException e) {
server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e); server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
} }
} }
/** /**
* Creates a new ephemeral node in the MERGING state for the merged region. * Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
* Create it ephemeral in case regionserver dies mid-merge. * Create it ephemeral in case regionserver dies mid-merge.
* *
* <p> * <p>
@ -674,32 +747,27 @@ public class RegionMergeTransaction {
* @param zkw zk reference * @param zkw zk reference
* @param region region to be created as offline * @param region region to be created as offline
* @param serverName server event originates from * @param serverName server event originates from
* @return Version of znode created.
* @throws KeeperException * @throws KeeperException
* @throws IOException * @throws IOException
*/ */
int createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region, public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
final ServerName serverName, final HRegionInfo a, final ServerName serverName, final HRegionInfo a,
final HRegionInfo b) throws KeeperException, IOException { final HRegionInfo b) throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for " LOG.debug(zkw.prefix("Creating ephemeral node for "
+ region.getEncodedName() + " in MERGING state")); + region.getEncodedName() + " in PENDING_MERGE state"));
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b); byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
RegionTransition rt = RegionTransition.createRegionTransition( RegionTransition rt = RegionTransition.createRegionTransition(
EventType.RS_ZK_REGION_MERGING, region.getRegionName(), serverName, payload); RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload);
String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) { if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node); throw new IOException("Failed create of ephemeral " + node);
} }
// Transition node from MERGING to MERGING and pick up version so we
// can be sure this znode is ours; version is needed deleting.
return transitionNodeMerging(zkw, region, serverName, -1, a, b);
} }
/** /**
* Transitions an existing node for the specified region which is currently in * Transitions an existing ephemeral node for the specified region which is
* the MERGING state to be in the MERGE state. Converts the ephemeral MERGING * currently in the begin state to be in the end state. Master cleans up the
* znode to an ephemeral MERGE node. Master cleans up MERGE znode when it * final MERGE znode when it reads it (or if we crash, zk will clean it up).
* reads it (or if we crash, zk will clean it up).
* *
* <p> * <p>
* Does not transition nodes from other states. If for some reason the node * Does not transition nodes from other states. If for some reason the node
@ -710,19 +778,18 @@ public class RegionMergeTransaction {
* This method can fail and return false for three different reasons: * This method can fail and return false for three different reasons:
* <ul> * <ul>
* <li>Node for this region does not exist</li> * <li>Node for this region does not exist</li>
* <li>Node for this region is not in MERGING state</li> * <li>Node for this region is not in the begin state</li>
* <li>After verifying MERGING state, update fails because of wrong version * <li>After verifying the begin state, update fails because of wrong version
* (this should never actually happen since an RS only does this transition * (this should never actually happen since an RS only does this transition
* following a transition to MERGING. if two RS are conflicting, one would * following a transition to the begin state. If two RS are conflicting, one would
* fail the original transition to MERGING and not this transition)</li> * fail the original transition to the begin state and not this transition)</li>
* </ul> * </ul>
* *
* <p> * <p>
* Does not set any watches. * Does not set any watches.
* *
* <p> * <p>
* This method should only be used by a RegionServer when completing the open * This method should only be used by a RegionServer when merging two regions.
* of merged region.
* *
* @param zkw zk reference * @param zkw zk reference
* @param merged region to be transitioned to opened * @param merged region to be transitioned to opened
@ -730,45 +797,19 @@ public class RegionMergeTransaction {
* @param b merging region B * @param b merging region B
* @param serverName server event originates from * @param serverName server event originates from
* @param znodeVersion expected version of data before modification * @param znodeVersion expected version of data before modification
* @param beginState the expected current state the znode should be
* @param endState the state to be transition to
* @return version of node after transition, -1 if unsuccessful transition * @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception * @throws KeeperException if unexpected zookeeper exception
* @throws IOException * @throws IOException
*/ */
private static int transitionNodeMerge(ZooKeeperWatcher zkw, public static int transitionMergingNode(ZooKeeperWatcher zkw,
HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName, HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion) throws KeeperException, IOException { final int znodeVersion, final EventType beginState,
final EventType endState) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b); byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
return ZKAssign.transitionNode(zkw, merged, serverName, return ZKAssign.transitionNode(zkw, merged, serverName,
EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGED, beginState, endState, znodeVersion, payload);
znodeVersion, payload);
}
/**
*
* @param zkw zk reference
* @param parent region to be transitioned to merging
* @param serverName server event originates from
* @param version znode version
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException
* @throws IOException
*/
int transitionNodeMerging(final ZooKeeperWatcher zkw,
final HRegionInfo parent, final ServerName serverName, final int version,
final HRegionInfo a, final HRegionInfo b) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, parent, serverName,
EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGING,
version, payload);
}
private static int tickleNodeMerge(ZooKeeperWatcher zkw, HRegionInfo merged,
HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, merged, serverName,
EventType.RS_ZK_REGION_MERGED, EventType.RS_ZK_REGION_MERGED,
znodeVersion, payload);
} }
/** /**

View File

@ -18,6 +18,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -54,6 +58,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -296,27 +301,18 @@ public class SplitTransaction {
if (server != null && server.getZooKeeper() != null) { if (server != null && server.getZooKeeper() != null) {
try { try {
createNodeSplitting(server.getZooKeeper(), createNodeSplitting(server.getZooKeeper(),
this.parent.getRegionInfo(), server.getServerName()); parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Failed creating SPLITTING znode on " + throw new IOException("Failed creating PENDING_SPLIT znode on " +
this.parent.getRegionNameAsString(), e); this.parent.getRegionNameAsString(), e);
} }
} }
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
if (server != null && server.getZooKeeper() != null) { if (server != null && server.getZooKeeper() != null) {
try { // After creating the split node, wait for master to transition it
// Transition node from SPLITTING to SPLITTING after creating the split node. // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
// Master will get the callback for node change only if the transition is successful. // knows about it and won't transition any region which is splitting.
// Note that if the transition fails then the rollback will delete the created znode znodeVersion = getZKNode(server, services);
// as the journal entry SET_SPLITTING_IN_ZK is added.
// TODO : May be we can add some new state to znode and handle the new state incase
// of success/failure
this.znodeVersion = transitionNodeSplitting(server.getZooKeeper(),
this.parent.getRegionInfo(), server.getServerName(), -1);
} catch (KeeperException e) {
throw new IOException("Failed setting SPLITTING znode on "
+ this.parent.getRegionNameAsString(), e);
}
} }
this.parent.getRegionFileSystem().createSplitsDir(); this.parent.getRegionFileSystem().createSplitsDir();
@ -444,9 +440,10 @@ public class SplitTransaction {
// Tell master about split by updating zk. If we fail, abort. // Tell master about split by updating zk. If we fail, abort.
if (server != null && server.getZooKeeper() != null) { if (server != null && server.getZooKeeper() != null) {
try { try {
this.znodeVersion = transitionNodeSplit(server.getZooKeeper(), this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(), parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion); server.getServerName(), this.znodeVersion,
RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
int spins = 0; int spins = 0;
// Now wait for the master to process the split. We know it's done // Now wait for the master to process the split. We know it's done
@ -459,9 +456,10 @@ public class SplitTransaction {
} }
Thread.sleep(100); Thread.sleep(100);
// When this returns -1 it means the znode doesn't exist // When this returns -1 it means the znode doesn't exist
this.znodeVersion = tickleNodeSplit(server.getZooKeeper(), this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(), parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion); server.getServerName(), this.znodeVersion,
RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
spins++; spins++;
} while (this.znodeVersion != -1 && !server.isStopped() } while (this.znodeVersion != -1 && !server.isStopped()
&& !services.isStopping()); && !services.isStopping());
@ -483,6 +481,76 @@ public class SplitTransaction {
// deleted and cleaned up. // deleted and cleaned up.
} }
/**
* Wait for the splitting node to be transitioned from pending_split
* to splitting by master. That's how we are sure master has processed
* the event and is good with us to move on. If we don't get any update,
* we periodically transition the node so that master gets the callback.
* If the node is removed or is not in pending_split state any more,
* we abort the split.
*/
private int getZKNode(final Server server,
final RegionServerServices services) throws IOException {
// Wait for the master to process the pending_split.
try {
int spins = 0;
Stat stat = new Stat();
ZooKeeperWatcher zkw = server.getZooKeeper();
ServerName expectedServer = server.getServerName();
String node = parent.getRegionInfo().getEncodedName();
while (!(server.isStopped() || services.isStopping())) {
if (spins % 5 == 0) {
LOG.debug("Still waiting for master to process "
+ "the pending_split for " + node);
transitionSplittingNode(zkw, parent.getRegionInfo(),
hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
RS_ZK_REQUEST_REGION_SPLIT);
}
Thread.sleep(100);
spins++;
byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
if (data == null) {
throw new IOException("Data is null, splitting node "
+ node + " no longer exists");
}
RegionTransition rt = RegionTransition.parseFrom(data);
EventType et = rt.getEventType();
if (et == RS_ZK_REGION_SPLITTING) {
ServerName serverName = rt.getServerName();
if (!serverName.equals(expectedServer)) {
throw new IOException("Splitting node " + node + " is for "
+ serverName + ", not us " + expectedServer);
}
byte [] payloadOfSplitting = rt.getPayload();
List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
payloadOfSplitting, 0, payloadOfSplitting.length);
assert splittingRegions.size() == 2;
HRegionInfo a = splittingRegions.get(0);
HRegionInfo b = splittingRegions.get(1);
if (!(hri_a.equals(a) && hri_b.equals(b))) {
throw new IOException("Splitting node " + node + " is for " + a + ", "
+ b + ", not expected daughters: " + hri_a + ", " + hri_b);
}
// Master has processed it.
return stat.getVersion();
}
if (et != RS_ZK_REQUEST_REGION_SPLIT) {
throw new IOException("Splitting node " + node
+ " moved out of splitting to " + et);
}
}
// Server is stopping/stopped
throw new IOException("Server is "
+ (services.isStopping() ? "stopping" : "stopped"));
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException("Failed getting SPLITTING znode on "
+ parent.getRegionNameAsString(), e);
}
}
/** /**
* Run the transaction. * Run the transaction.
* @param server Hosting server instance. Can be null when testing (won't try * @param server Hosting server instance. Can be null when testing (won't try
@ -719,6 +787,7 @@ public class SplitTransaction {
* @return True if we successfully rolled back, false if we got to the point * @return True if we successfully rolled back, false if we got to the point
* of no return and so now need to abort the server to minimize damage. * of no return and so now need to abort the server to minimize damage.
*/ */
@SuppressWarnings("deprecation")
public boolean rollback(final Server server, final RegionServerServices services) public boolean rollback(final Server server, final RegionServerServices services)
throws IOException { throws IOException {
// Coprocessor callback // Coprocessor callback
@ -801,15 +870,20 @@ public class SplitTransaction {
private static void cleanZK(final Server server, final HRegionInfo hri) { private static void cleanZK(final Server server, final HRegionInfo hri) {
try { try {
// Only delete if its in expected state; could have been hijacked. // Only delete if its in expected state; could have been hijacked.
if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
RS_ZK_REQUEST_REGION_SPLIT)) {
ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
EventType.RS_ZK_REGION_SPLITTING); RS_ZK_REGION_SPLITTING);
}
} catch (KeeperException.NoNodeException e) {
LOG.warn("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
} catch (KeeperException e) { } catch (KeeperException e) {
server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e); server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
} }
} }
/** /**
* Creates a new ephemeral node in the SPLITTING state for the specified region. * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
* Create it ephemeral in case regionserver dies mid-split. * Create it ephemeral in case regionserver dies mid-split.
* *
* <p>Does not transition nodes from other states. If a node already exists * <p>Does not transition nodes from other states. If a node already exists
@ -818,30 +892,27 @@ public class SplitTransaction {
* @param zkw zk reference * @param zkw zk reference
* @param region region to be created as offline * @param region region to be created as offline
* @param serverName server event originates from * @param serverName server event originates from
* @return Version of znode created.
* @throws KeeperException * @throws KeeperException
* @throws IOException * @throws IOException
*/ */
int createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region, public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
final ServerName serverName) throws KeeperException, IOException { final ServerName serverName, final HRegionInfo a,
final HRegionInfo b) throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for " + LOG.debug(zkw.prefix("Creating ephemeral node for " +
region.getEncodedName() + " in SPLITTING state")); region.getEncodedName() + " in PENDING_SPLIT state"));
RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING, byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
region.getRegionName(), serverName); RegionTransition rt = RegionTransition.createRegionTransition(
RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) { if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node); throw new IOException("Failed create of ephemeral " + node);
} }
// Transition node from SPLITTING to SPLITTING and pick up version so we
// can be sure this znode is ours; version is needed deleting.
return transitionNodeSplitting(zkw, region, serverName, -1);
} }
/** /**
* Transitions an existing node for the specified region which is * Transitions an existing ephemeral node for the specified region which is
* currently in the SPLITTING state to be in the SPLIT state. Converts the * currently in the begin state to be in the end state. Master cleans up the
* ephemeral SPLITTING znode to an ephemeral SPLIT node. Master cleans up * final SPLIT znode when it reads it (or if we crash, zk will clean it up).
* SPLIT znode when it reads it (or if we crash, zk will clean it up).
* *
* <p>Does not transition nodes from other states. If for some reason the * <p>Does not transition nodes from other states. If for some reason the
* node could not be transitioned, the method returns -1. If the transition * node could not be transitioned, the method returns -1. If the transition
@ -849,60 +920,35 @@ public class SplitTransaction {
* *
* <p>This method can fail and return false for three different reasons: * <p>This method can fail and return false for three different reasons:
* <ul><li>Node for this region does not exist</li> * <ul><li>Node for this region does not exist</li>
* <li>Node for this region is not in SPLITTING state</li> * <li>Node for this region is not in the begin state</li>
* <li>After verifying SPLITTING state, update fails because of wrong version * <li>After verifying the begin state, update fails because of wrong version
* (this should never actually happen since an RS only does this transition * (this should never actually happen since an RS only does this transition
* following a transition to SPLITTING. if two RS are conflicting, one would * following a transition to the begin state. If two RS are conflicting, one would
* fail the original transition to SPLITTING and not this transition)</li> * fail the original transition to the begin state and not this transition)</li>
* </ul> * </ul>
* *
* <p>Does not set any watches. * <p>Does not set any watches.
* *
* <p>This method should only be used by a RegionServer when completing the * <p>This method should only be used by a RegionServer when splitting a region.
* open of a region.
* *
* @param zkw zk reference * @param zkw zk reference
* @param parent region to be transitioned to opened * @param parent region to be transitioned to opened
* @param a Daughter a of split * @param a Daughter a of split
* @param b Daughter b of split * @param b Daughter b of split
* @param serverName server event originates from * @param serverName server event originates from
* @param znodeVersion expected version of data before modification
* @param beginState the expected current state the znode should be
* @param endState the state to be transition to
* @return version of node after transition, -1 if unsuccessful transition * @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception * @throws KeeperException if unexpected zookeeper exception
* @throws IOException * @throws IOException
*/ */
private static int transitionNodeSplit(ZooKeeperWatcher zkw, public static int transitionSplittingNode(ZooKeeperWatcher zkw,
HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName, HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion) final int znodeVersion, final EventType beginState,
throws KeeperException, IOException { final EventType endState) throws KeeperException, IOException {
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b); byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, parent, serverName, return ZKAssign.transitionNode(zkw, parent, serverName,
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLIT, beginState, endState, znodeVersion, payload);
znodeVersion, payload);
}
/**
*
* @param zkw zk reference
* @param parent region to be transitioned to splitting
* @param serverName server event originates from
* @param version znode version
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException
* @throws IOException
*/
int transitionNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo parent,
final ServerName serverName, final int version) throws KeeperException, IOException {
return ZKAssign.transitionNode(zkw, parent, serverName,
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
}
private static int tickleNodeSplit(ZooKeeperWatcher zkw,
HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion)
throws KeeperException, IOException {
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, parent, serverName,
EventType.RS_ZK_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT,
znodeVersion, payload);
} }
} }

View File

@ -84,7 +84,8 @@ public class TestMaster {
LOG.info("Splitting table"); LOG.info("Splitting table");
TEST_UTIL.getHBaseAdmin().split(TABLENAME.getName()); TEST_UTIL.getHBaseAdmin().split(TABLENAME.getName());
LOG.info("Waiting for split result to be about to open"); LOG.info("Waiting for split result to be about to open");
while (!m.assignmentManager.wasSplitHandlerCalled()) { RegionStates regionStates = m.assignmentManager.getRegionStates();
while (regionStates.getRegionsOfTable(TABLENAME).size() <= 1) {
Thread.sleep(100); Thread.sleep(100);
} }
LOG.info("Making sure we can call getTableRegions while opening"); LOG.info("Making sure we can call getTableRegions while opening");

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -47,14 +46,17 @@ import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@ -148,7 +150,7 @@ public class TestMasterFailover {
* </ul> * </ul>
* @throws Exception * @throws Exception
*/ */
@Test (timeout=180000) @Test (timeout=240000)
public void testMasterFailoverWithMockedRIT() throws Exception { public void testMasterFailoverWithMockedRIT() throws Exception {
final int NUM_MASTERS = 1; final int NUM_MASTERS = 1;
@ -214,10 +216,30 @@ public class TestMasterFailover {
List<HRegionInfo> disabledRegions = TEST_UTIL.createMultiRegionsInMeta( List<HRegionInfo> disabledRegions = TEST_UTIL.createMultiRegionsInMeta(
TEST_UTIL.getConfiguration(), htdDisabled, SPLIT_KEYS); TEST_UTIL.getConfiguration(), htdDisabled, SPLIT_KEYS);
TableName tableWithMergingRegions = TableName.valueOf("tableWithMergingRegions");
TEST_UTIL.createTable(tableWithMergingRegions, FAMILY, new byte [][] {Bytes.toBytes("m")});
log("Regions in hbase:meta and namespace have been created"); log("Regions in hbase:meta and namespace have been created");
// at this point we only expect 3 regions to be assigned out (catalogs and namespace) // at this point we only expect 4 regions to be assigned out
assertEquals(2, cluster.countServedRegions()); // (catalogs and namespace, + 2 merging regions)
assertEquals(4, cluster.countServedRegions());
// Move merging regions to the same region server
AssignmentManager am = master.getAssignmentManager();
RegionStates regionStates = am.getRegionStates();
List<HRegionInfo> mergingRegions = regionStates.getRegionsOfTable(tableWithMergingRegions);
assertEquals(2, mergingRegions.size());
HRegionInfo a = mergingRegions.get(0);
HRegionInfo b = mergingRegions.get(1);
HRegionInfo newRegion = RegionMergeTransaction.getMergedRegionInfo(a, b);
ServerName mergingServer = regionStates.getRegionServerOfRegion(a);
ServerName serverB = regionStates.getRegionServerOfRegion(b);
if (!serverB.equals(mergingServer)) {
RegionPlan plan = new RegionPlan(b, serverB, mergingServer);
am.balance(plan);
assertTrue(am.waitForAssignment(b));
}
// Let's just assign everything to first RS // Let's just assign everything to first RS
HRegionServer hrs = cluster.getRegionServer(0); HRegionServer hrs = cluster.getRegionServer(0);
@ -339,6 +361,15 @@ public class TestMasterFailover {
Thread.sleep(100); Thread.sleep(100);
} }
/*
* ZK = MERGING
*/
// Regions of table of merging regions
// Cause: Master was down while merging was going on
RegionMergeTransaction.createNodeMerging(
zkw, newRegion, mergingServer, a, b);
/* /*
* ZK = NONE * ZK = NONE
*/ */
@ -356,6 +387,16 @@ public class TestMasterFailover {
cluster.waitForActiveAndReadyMaster(); cluster.waitForActiveAndReadyMaster();
log("Master is ready"); log("Master is ready");
// Get new region states since master restarted
regionStates = master.getAssignmentManager().getRegionStates();
// Merging region should remain merging
assertTrue(regionStates.isRegionInState(a, State.MERGING));
assertTrue(regionStates.isRegionInState(b, State.MERGING));
assertTrue(regionStates.isRegionInState(newRegion, State.MERGING_NEW));
// Now remove the faked merging znode, merging regions should be
// offlined automatically, otherwise it is a bug in AM.
ZKAssign.deleteNodeFailSilent(zkw, newRegion);
// Failover should be completed, now wait for no RIT // Failover should be completed, now wait for no RIT
log("Waiting for no more RIT"); log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw); ZKAssign.blockUntilNoRIT(zkw);
@ -375,6 +416,9 @@ public class TestMasterFailover {
// Everything that should be offline should not be online // Everything that should be offline should not be online
for (HRegionInfo hri : regionsThatShouldBeOffline) { for (HRegionInfo hri : regionsThatShouldBeOffline) {
if (onlineRegions.contains(hri)) {
LOG.debug(hri);
}
assertFalse(onlineRegions.contains(hri)); assertFalse(onlineRegions.contains(hri));
} }
@ -384,7 +428,6 @@ public class TestMasterFailover {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
/** /**
* Complex test of master failover that tests as many permutations of the * Complex test of master failover that tests as many permutations of the
* different possible states that regions in transition could be in within ZK * different possible states that regions in transition could be in within ZK
@ -794,7 +837,8 @@ public class TestMasterFailover {
long maxTime = 120000; long maxTime = 120000;
boolean done = master.assignmentManager.waitUntilNoRegionsInTransition(maxTime); boolean done = master.assignmentManager.waitUntilNoRegionsInTransition(maxTime);
if (!done) { if (!done) {
LOG.info("rit=" + master.getAssignmentManager().getRegionStates().getRegionsInTransition()); RegionStates regionStates = master.getAssignmentManager().getRegionStates();
LOG.info("rit=" + regionStates.getRegionsInTransition());
} }
long elapsed = System.currentTimeMillis() - now; long elapsed = System.currentTimeMillis() - now;
assertTrue("Elapsed=" + elapsed + ", maxTime=" + maxTime + ", done=" + done, assertTrue("Elapsed=" + elapsed + ", maxTime=" + maxTime + ", done=" + done,

View File

@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -262,8 +261,6 @@ public class TestSplitTransactionOnCluster {
HTable t = createTableAndWait(tableName.getName(), Bytes.toBytes("cf")); HTable t = createTableAndWait(tableName.getName(), Bytes.toBytes("cf"));
final List<HRegion> regions = cluster.getRegions(tableName); final List<HRegion> regions = cluster.getRegions(tableName);
final HRegionInfo hri = getAndCheckSingleTableRegion(regions); final HRegionInfo hri = getAndCheckSingleTableRegion(regions);
int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName());
final HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
insertData(tableName.getName(), admin, t); insertData(tableName.getName(), admin, t);
t.close(); t.close();
@ -349,7 +346,7 @@ public class TestSplitTransactionOnCluster {
int regionCount = ProtobufUtil.getOnlineRegions(server).size(); int regionCount = ProtobufUtil.getOnlineRegions(server).size();
// Now, before we split, set special flag in master, a flag that has // Now, before we split, set special flag in master, a flag that has
// it FAIL the processing of split. // it FAIL the processing of split.
SplitRegionHandler.TEST_SKIP = true; AssignmentManager.TEST_SKIP_SPLIT_HANDLING = true;
// Now try splitting and it should work. // Now try splitting and it should work.
split(hri, server, regionCount); split(hri, server, regionCount);
// Get daughters // Get daughters
@ -357,15 +354,18 @@ public class TestSplitTransactionOnCluster {
// Assert the ephemeral node is up in zk. // Assert the ephemeral node is up in zk.
String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(), String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(),
hri.getEncodedName()); hri.getEncodedName());
Stat stats = RegionTransition rt = null;
TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); Stat stats = null;
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); // Wait till the znode moved to SPLIT
RegionTransition rt = for (int i=0; i<100; i++) {
RegionTransition.parseFrom(ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(), stats = TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
rt = RegionTransition.parseFrom(ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
hri.getEncodedName())); hri.getEncodedName()));
// State could be SPLIT or SPLITTING. if (rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT)) break;
assertTrue(rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) || Thread.sleep(100);
rt.getEventType().equals(EventType.RS_ZK_REGION_SPLITTING)); }
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
assertTrue(rt != null && rt.getEventType().equals(EventType.RS_ZK_REGION_SPLIT));
// Now crash the server // Now crash the server
cluster.abortRegionServer(tableRegionIndex); cluster.abortRegionServer(tableRegionIndex);
waitUntilRegionServerDead(); waitUntilRegionServerDead();
@ -387,7 +387,7 @@ public class TestSplitTransactionOnCluster {
assertTrue(stats == null); assertTrue(stats == null);
} finally { } finally {
// Set this flag back. // Set this flag back.
SplitRegionHandler.TEST_SKIP = false; AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
admin.setBalancerRunning(true, false); admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true); cluster.getMaster().setCatalogJanitorEnabled(true);
t.close(); t.close();
@ -645,7 +645,7 @@ public class TestSplitTransactionOnCluster {
printOutRegions(server, "Initial regions: "); printOutRegions(server, "Initial regions: ");
// Now, before we split, set special flag in master, a flag that has // Now, before we split, set special flag in master, a flag that has
// it FAIL the processing of split. // it FAIL the processing of split.
SplitRegionHandler.TEST_SKIP = true; AssignmentManager.TEST_SKIP_SPLIT_HANDLING = true;
// Now try splitting and it should work. // Now try splitting and it should work.
this.admin.split(hri.getRegionNameAsString()); this.admin.split(hri.getRegionNameAsString());
@ -675,7 +675,7 @@ public class TestSplitTransactionOnCluster {
assertTrue(regionServerOfRegion != null); assertTrue(regionServerOfRegion != null);
// Remove the block so that split can move ahead. // Remove the block so that split can move ahead.
SplitRegionHandler.TEST_SKIP = false; AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
String node = ZKAssign.getNodeName(zkw, hri.getEncodedName()); String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
Stat stat = new Stat(); Stat stat = new Stat();
byte[] data = ZKUtil.getDataNoWatch(zkw, node, stat); byte[] data = ZKUtil.getDataNoWatch(zkw, node, stat);
@ -692,7 +692,7 @@ public class TestSplitTransactionOnCluster {
assertTrue(regionServerOfRegion == null); assertTrue(regionServerOfRegion == null);
} finally { } finally {
// Set this flag back. // Set this flag back.
SplitRegionHandler.TEST_SKIP = false; AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
admin.setBalancerRunning(true, false); admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true); cluster.getMaster().setCatalogJanitorEnabled(true);
t.close(); t.close();
@ -765,8 +765,6 @@ public class TestSplitTransactionOnCluster {
ServerName regionServerOfRegion = regionStates.getRegionServerOfRegion(hri); ServerName regionServerOfRegion = regionStates.getRegionServerOfRegion(hri);
assertTrue(regionServerOfRegion == null); assertTrue(regionServerOfRegion == null);
} finally { } finally {
// Set this flag back.
SplitRegionHandler.TEST_SKIP = false;
this.admin.setBalancerRunning(true, false); this.admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true); cluster.getMaster().setCatalogJanitorEnabled(true);
t.close(); t.close();
@ -998,8 +996,8 @@ public class TestSplitTransactionOnCluster {
assertTrue("not able to find a splittable region", region != null); assertTrue("not able to find a splittable region", region != null);
SplitTransaction st = new MockedSplitTransaction(region, Bytes.toBytes("row2")) { SplitTransaction st = new MockedSplitTransaction(region, Bytes.toBytes("row2")) {
@Override @Override
int createNodeSplitting(ZooKeeperWatcher zkw, HRegionInfo region, public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
ServerName serverName) throws KeeperException, IOException { final RegionServerServices services, boolean testing) throws IOException {
throw new SplittingNodeCreationFailedException (); throw new SplittingNodeCreationFailedException ();
} }
}; };