HBASE-9098: During recovery use ZK as the source of truth for region state - v2
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1512553 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
28eb922551
commit
e80484d7bf
@ -53,6 +53,7 @@ public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationT
|
||||
// smaller enough in order for the replay can complete before ChaosMonkey kills another region
|
||||
// server
|
||||
conf.setInt("hbase.log.replay.retries.number", 2);
|
||||
conf.setInt("hbase.log.replay.rpc.timeout", 2000);
|
||||
conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
|
||||
}
|
||||
if(!util.isDistributedCluster()) {
|
||||
|
@ -558,6 +558,10 @@ public final class AdminProtos {
|
||||
// optional .GetRegionInfoResponse.CompactionState compaction_state = 2;
|
||||
boolean hasCompactionState();
|
||||
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState getCompactionState();
|
||||
|
||||
// optional bool isRecovering = 3;
|
||||
boolean hasIsRecovering();
|
||||
boolean getIsRecovering();
|
||||
}
|
||||
public static final class GetRegionInfoResponse extends
|
||||
com.google.protobuf.GeneratedMessage
|
||||
@ -686,9 +690,20 @@ public final class AdminProtos {
|
||||
return compactionState_;
|
||||
}
|
||||
|
||||
// optional bool isRecovering = 3;
|
||||
public static final int ISRECOVERING_FIELD_NUMBER = 3;
|
||||
private boolean isRecovering_;
|
||||
public boolean hasIsRecovering() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
public boolean getIsRecovering() {
|
||||
return isRecovering_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
regionInfo_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.getDefaultInstance();
|
||||
compactionState_ = org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
|
||||
isRecovering_ = false;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
@ -716,6 +731,9 @@ public final class AdminProtos {
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeEnum(2, compactionState_.getNumber());
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeBool(3, isRecovering_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
@ -733,6 +751,10 @@ public final class AdminProtos {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeEnumSize(2, compactionState_.getNumber());
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(3, isRecovering_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
@ -766,6 +788,11 @@ public final class AdminProtos {
|
||||
result = result &&
|
||||
(getCompactionState() == other.getCompactionState());
|
||||
}
|
||||
result = result && (hasIsRecovering() == other.hasIsRecovering());
|
||||
if (hasIsRecovering()) {
|
||||
result = result && (getIsRecovering()
|
||||
== other.getIsRecovering());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
@ -783,6 +810,10 @@ public final class AdminProtos {
|
||||
hash = (37 * hash) + COMPACTION_STATE_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashEnum(getCompactionState());
|
||||
}
|
||||
if (hasIsRecovering()) {
|
||||
hash = (37 * hash) + ISRECOVERING_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getIsRecovering());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
return hash;
|
||||
}
|
||||
@ -908,6 +939,8 @@ public final class AdminProtos {
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
compactionState_ = org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
isRecovering_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -958,6 +991,10 @@ public final class AdminProtos {
|
||||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
result.compactionState_ = compactionState_;
|
||||
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.isRecovering_ = isRecovering_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
@ -980,6 +1017,9 @@ public final class AdminProtos {
|
||||
if (other.hasCompactionState()) {
|
||||
setCompactionState(other.getCompactionState());
|
||||
}
|
||||
if (other.hasIsRecovering()) {
|
||||
setIsRecovering(other.getIsRecovering());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
@ -1039,6 +1079,11 @@ public final class AdminProtos {
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 24: {
|
||||
bitField0_ |= 0x00000004;
|
||||
isRecovering_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1159,6 +1204,27 @@ public final class AdminProtos {
|
||||
return this;
|
||||
}
|
||||
|
||||
// optional bool isRecovering = 3;
|
||||
private boolean isRecovering_ ;
|
||||
public boolean hasIsRecovering() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
public boolean getIsRecovering() {
|
||||
return isRecovering_;
|
||||
}
|
||||
public Builder setIsRecovering(boolean value) {
|
||||
bitField0_ |= 0x00000004;
|
||||
isRecovering_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearIsRecovering() {
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
isRecovering_ = false;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:GetRegionInfoResponse)
|
||||
}
|
||||
|
||||
@ -15389,78 +15455,79 @@ public final class AdminProtos {
|
||||
"\n\013Admin.proto\032\014Client.proto\032\013hbase.proto" +
|
||||
"\032\tWAL.proto\"R\n\024GetRegionInfoRequest\022 \n\006r" +
|
||||
"egion\030\001 \002(\0132\020.RegionSpecifier\022\030\n\020compact" +
|
||||
"ion_state\030\002 \001(\010\"\303\001\n\025GetRegionInfoRespons" +
|
||||
"ion_state\030\002 \001(\010\"\331\001\n\025GetRegionInfoRespons" +
|
||||
"e\022 \n\013region_info\030\001 \002(\0132\013.RegionInfo\022@\n\020c" +
|
||||
"ompaction_state\030\002 \001(\0162&.GetRegionInfoRes" +
|
||||
"ponse.CompactionState\"F\n\017CompactionState" +
|
||||
"\022\010\n\004NONE\020\000\022\t\n\005MINOR\020\001\022\t\n\005MAJOR\020\002\022\023\n\017MAJO" +
|
||||
"R_AND_MINOR\020\003\"G\n\023GetStoreFileRequest\022 \n\006" +
|
||||
"region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006family",
|
||||
"\030\002 \003(\014\"*\n\024GetStoreFileResponse\022\022\n\nstore_" +
|
||||
"file\030\001 \003(\t\"\030\n\026GetOnlineRegionRequest\";\n\027" +
|
||||
"GetOnlineRegionResponse\022 \n\013region_info\030\001" +
|
||||
" \003(\0132\013.RegionInfo\"\275\001\n\021OpenRegionRequest\022" +
|
||||
"4\n\topen_info\030\001 \003(\0132!.OpenRegionRequest.R" +
|
||||
"egionOpenInfo\032r\n\016RegionOpenInfo\022\033\n\006regio" +
|
||||
"n\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_of_offli" +
|
||||
"ne_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 \003(\0132\013.S" +
|
||||
"erverName\"\235\001\n\022OpenRegionResponse\022=\n\ropen" +
|
||||
"ing_state\030\001 \003(\0162&.OpenRegionResponse.Reg",
|
||||
"ionOpeningState\"H\n\022RegionOpeningState\022\n\n" +
|
||||
"\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FAILED_" +
|
||||
"OPENING\020\002\"\240\001\n\022CloseRegionRequest\022 \n\006regi" +
|
||||
"on\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027version_of" +
|
||||
"_closing_node\030\002 \001(\r\022\036\n\020transition_in_ZK\030" +
|
||||
"\003 \001(\010:\004true\022\'\n\022destination_server\030\004 \001(\0132" +
|
||||
"\013.ServerName\"%\n\023CloseRegionResponse\022\016\n\006c" +
|
||||
"losed\030\001 \002(\010\"P\n\022FlushRegionRequest\022 \n\006reg" +
|
||||
"ion\030\001 \002(\0132\020.RegionSpecifier\022\030\n\020if_older_" +
|
||||
"than_ts\030\002 \001(\004\"?\n\023FlushRegionResponse\022\027\n\017",
|
||||
"last_flush_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K" +
|
||||
"\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020.R" +
|
||||
"egionSpecifier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023S" +
|
||||
"plitRegionResponse\"W\n\024CompactRegionReque" +
|
||||
"st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005" +
|
||||
"major\030\002 \001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRe" +
|
||||
"gionResponse\"v\n\023MergeRegionsRequest\022\"\n\010r" +
|
||||
"egion_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010regio" +
|
||||
"n_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcible\030" +
|
||||
"\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"X\n\010",
|
||||
"WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_va" +
|
||||
"lue_bytes\030\002 \003(\014\022\035\n\025associated_cell_count" +
|
||||
"\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n\005en" +
|
||||
"try\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALEntr" +
|
||||
"yResponse\"\026\n\024RollWALWriterRequest\"0\n\025Rol" +
|
||||
"lWALWriterResponse\022\027\n\017region_to_flush\030\001 " +
|
||||
"\003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 \002(\t" +
|
||||
"\"\024\n\022StopServerResponse\"\026\n\024GetServerInfoR" +
|
||||
"equest\"B\n\nServerInfo\022 \n\013server_name\030\001 \002(" +
|
||||
"\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025Ge",
|
||||
"tServerInfoResponse\022 \n\013server_info\030\001 \002(\013" +
|
||||
"2\013.ServerInfo2\337\006\n\014AdminService\022>\n\rGetReg" +
|
||||
"ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi" +
|
||||
"onInfoResponse\022;\n\014GetStoreFile\022\024.GetStor" +
|
||||
"eFileRequest\032\025.GetStoreFileResponse\022D\n\017G" +
|
||||
"etOnlineRegion\022\027.GetOnlineRegionRequest\032" +
|
||||
"\030.GetOnlineRegionResponse\0225\n\nOpenRegion\022" +
|
||||
"\022.OpenRegionRequest\032\023.OpenRegionResponse" +
|
||||
"\0228\n\013CloseRegion\022\023.CloseRegionRequest\032\024.C" +
|
||||
"loseRegionResponse\0228\n\013FlushRegion\022\023.Flus",
|
||||
"hRegionRequest\032\024.FlushRegionResponse\0228\n\013" +
|
||||
"SplitRegion\022\023.SplitRegionRequest\032\024.Split" +
|
||||
"RegionResponse\022>\n\rCompactRegion\022\025.Compac" +
|
||||
"tRegionRequest\032\026.CompactRegionResponse\022;" +
|
||||
"\n\014MergeRegions\022\024.MergeRegionsRequest\032\025.M" +
|
||||
"ergeRegionsResponse\022J\n\021ReplicateWALEntry" +
|
||||
"\022\031.ReplicateWALEntryRequest\032\032.ReplicateW" +
|
||||
"ALEntryResponse\022\'\n\006Replay\022\r.MultiRequest" +
|
||||
"\032\016.MultiResponse\022>\n\rRollWALWriter\022\025.Roll" +
|
||||
"WALWriterRequest\032\026.RollWALWriterResponse",
|
||||
"\022>\n\rGetServerInfo\022\025.GetServerInfoRequest" +
|
||||
"\032\026.GetServerInfoResponse\0225\n\nStopServer\022\022" +
|
||||
".StopServerRequest\032\023.StopServerResponseB" +
|
||||
"A\n*org.apache.hadoop.hbase.protobuf.gene" +
|
||||
"ratedB\013AdminProtosH\001\210\001\001\240\001\001"
|
||||
"ponse.CompactionState\022\024\n\014isRecovering\030\003 " +
|
||||
"\001(\010\"F\n\017CompactionState\022\010\n\004NONE\020\000\022\t\n\005MINO" +
|
||||
"R\020\001\022\t\n\005MAJOR\020\002\022\023\n\017MAJOR_AND_MINOR\020\003\"G\n\023G" +
|
||||
"etStoreFileRequest\022 \n\006region\030\001 \002(\0132\020.Reg",
|
||||
"ionSpecifier\022\016\n\006family\030\002 \003(\014\"*\n\024GetStore" +
|
||||
"FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" +
|
||||
"nlineRegionRequest\";\n\027GetOnlineRegionRes" +
|
||||
"ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" +
|
||||
"\275\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
|
||||
"2!.OpenRegionRequest.RegionOpenInfo\032r\n\016R" +
|
||||
"egionOpenInfo\022\033\n\006region\030\001 \002(\0132\013.RegionIn" +
|
||||
"fo\022\037\n\027version_of_offline_node\030\002 \001(\r\022\"\n\rf" +
|
||||
"avored_nodes\030\003 \003(\0132\013.ServerName\"\235\001\n\022Open" +
|
||||
"RegionResponse\022=\n\ropening_state\030\001 \003(\0162&.",
|
||||
"OpenRegionResponse.RegionOpeningState\"H\n" +
|
||||
"\022RegionOpeningState\022\n\n\006OPENED\020\000\022\022\n\016ALREA" +
|
||||
"DY_OPENED\020\001\022\022\n\016FAILED_OPENING\020\002\"\240\001\n\022Clos" +
|
||||
"eRegionRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
|
||||
"pecifier\022\037\n\027version_of_closing_node\030\002 \001(" +
|
||||
"\r\022\036\n\020transition_in_ZK\030\003 \001(\010:\004true\022\'\n\022des" +
|
||||
"tination_server\030\004 \001(\0132\013.ServerName\"%\n\023Cl" +
|
||||
"oseRegionResponse\022\016\n\006closed\030\001 \002(\010\"P\n\022Flu" +
|
||||
"shRegionRequest\022 \n\006region\030\001 \002(\0132\020.Region" +
|
||||
"Specifier\022\030\n\020if_older_than_ts\030\002 \001(\004\"?\n\023F",
|
||||
"lushRegionResponse\022\027\n\017last_flush_time\030\001 " +
|
||||
"\002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitRegionReque" +
|
||||
"st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013" +
|
||||
"split_point\030\002 \001(\014\"\025\n\023SplitRegionResponse" +
|
||||
"\"W\n\024CompactRegionRequest\022 \n\006region\030\001 \002(\013" +
|
||||
"2\020.RegionSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006fam" +
|
||||
"ily\030\003 \001(\014\"\027\n\025CompactRegionResponse\"v\n\023Me" +
|
||||
"rgeRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Re" +
|
||||
"gionSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.Region" +
|
||||
"Specifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Me",
|
||||
"rgeRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 " +
|
||||
"\002(\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n" +
|
||||
"\025associated_cell_count\030\003 \001(\005\"4\n\030Replicat" +
|
||||
"eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" +
|
||||
"ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" +
|
||||
"ALWriterRequest\"0\n\025RollWALWriterResponse" +
|
||||
"\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerR" +
|
||||
"equest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerResp" +
|
||||
"onse\"\026\n\024GetServerInfoRequest\"B\n\nServerIn" +
|
||||
"fo\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\n",
|
||||
"webui_port\030\002 \001(\r\"9\n\025GetServerInfoRespons" +
|
||||
"e\022 \n\013server_info\030\001 \002(\0132\013.ServerInfo2\337\006\n\014" +
|
||||
"AdminService\022>\n\rGetRegionInfo\022\025.GetRegio" +
|
||||
"nInfoRequest\032\026.GetRegionInfoResponse\022;\n\014" +
|
||||
"GetStoreFile\022\024.GetStoreFileRequest\032\025.Get" +
|
||||
"StoreFileResponse\022D\n\017GetOnlineRegion\022\027.G" +
|
||||
"etOnlineRegionRequest\032\030.GetOnlineRegionR" +
|
||||
"esponse\0225\n\nOpenRegion\022\022.OpenRegionReques" +
|
||||
"t\032\023.OpenRegionResponse\0228\n\013CloseRegion\022\023." +
|
||||
"CloseRegionRequest\032\024.CloseRegionResponse",
|
||||
"\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024.F" +
|
||||
"lushRegionResponse\0228\n\013SplitRegion\022\023.Spli" +
|
||||
"tRegionRequest\032\024.SplitRegionResponse\022>\n\r" +
|
||||
"CompactRegion\022\025.CompactRegionRequest\032\026.C" +
|
||||
"ompactRegionResponse\022;\n\014MergeRegions\022\024.M" +
|
||||
"ergeRegionsRequest\032\025.MergeRegionsRespons" +
|
||||
"e\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEnt" +
|
||||
"ryRequest\032\032.ReplicateWALEntryResponse\022\'\n" +
|
||||
"\006Replay\022\r.MultiRequest\032\016.MultiResponse\022>" +
|
||||
"\n\rRollWALWriter\022\025.RollWALWriterRequest\032\026",
|
||||
".RollWALWriterResponse\022>\n\rGetServerInfo\022" +
|
||||
"\025.GetServerInfoRequest\032\026.GetServerInfoRe" +
|
||||
"sponse\0225\n\nStopServer\022\022.StopServerRequest" +
|
||||
"\032\023.StopServerResponseBA\n*org.apache.hado" +
|
||||
"op.hbase.protobuf.generatedB\013AdminProtos" +
|
||||
"H\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
@ -15480,7 +15547,7 @@ public final class AdminProtos {
|
||||
internal_static_GetRegionInfoResponse_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_GetRegionInfoResponse_descriptor,
|
||||
new java.lang.String[] { "RegionInfo", "CompactionState", },
|
||||
new java.lang.String[] { "RegionInfo", "CompactionState", "IsRecovering", },
|
||||
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.class,
|
||||
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.Builder.class);
|
||||
internal_static_GetStoreFileRequest_descriptor =
|
||||
|
@ -36,6 +36,7 @@ message GetRegionInfoRequest {
|
||||
message GetRegionInfoResponse {
|
||||
required RegionInfo region_info = 1;
|
||||
optional CompactionState compaction_state = 2;
|
||||
optional bool isRecovering = 3;
|
||||
|
||||
enum CompactionState {
|
||||
NONE = 0;
|
||||
|
@ -335,6 +335,30 @@ public class MasterFileSystem {
|
||||
return logDirs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark regions in recovering state when distributedLogReplay are set true
|
||||
* @param serverNames Set of ServerNames to be replayed wals in order to recover changes contained
|
||||
* in them
|
||||
* @throws IOException
|
||||
*/
|
||||
public void prepareLogReplay(Set<ServerName> serverNames) throws IOException {
|
||||
if (!this.distributedLogReplay) {
|
||||
return;
|
||||
}
|
||||
// mark regions in recovering state
|
||||
for (ServerName serverName : serverNames) {
|
||||
NavigableMap<HRegionInfo, Result> regions = this.getServerUserRegions(serverName);
|
||||
if (regions == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
this.splitLogManager.markRegionsRecoveringInZK(serverName, regions.keySet());
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark regions in recovering state when distributedLogReplay are set true
|
||||
* @param serverName Failed region server whose wals to be replayed
|
||||
|
@ -55,9 +55,15 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
|
||||
try {
|
||||
AssignmentManager am = this.services.getAssignmentManager();
|
||||
try {
|
||||
if (this.shouldSplitHlog && !this.distributedLogReplay) {
|
||||
if (this.shouldSplitHlog) {
|
||||
LOG.info("Splitting META logs for " + serverName);
|
||||
this.services.getMasterFileSystem().splitMetaLog(serverName);
|
||||
if (this.distributedLogReplay) {
|
||||
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
|
||||
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
|
||||
} else {
|
||||
this.services.getMasterFileSystem().splitMetaLog(serverName);
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
this.services.getExecutorService().submit(this);
|
||||
@ -155,21 +161,6 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
|
||||
long waitTime = this.server.getConfiguration().getLong(
|
||||
"hbase.catalog.verification.timeout", 1000);
|
||||
|
||||
if (this.shouldSplitHlog && this.distributedLogReplay) {
|
||||
LOG.info("Splitting META logs for " + serverName
|
||||
+ ". Mark META region in recovery before assignment.");
|
||||
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
|
||||
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
try {
|
||||
this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
|
||||
} catch (IOException ioe) {
|
||||
this.services.getExecutorService().submit(this);
|
||||
this.deadServers.add(serverName);
|
||||
throw new IOException("failed to mark META region in recovery on " + serverName
|
||||
+ ", will retry", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
int iFlag = 0;
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -182,9 +182,16 @@ public class ServerShutdownHandler extends EventHandler {
|
||||
}
|
||||
|
||||
try {
|
||||
if (this.shouldSplitHlog && !this.distributedLogReplay) {
|
||||
if (this.shouldSplitHlog) {
|
||||
LOG.info("Splitting logs for " + serverName + " before assignment.");
|
||||
this.services.getMasterFileSystem().splitLog(serverName);
|
||||
if (this.distributedLogReplay) {
|
||||
LOG.info("Mark regions in recovery before assignment.");
|
||||
Set<ServerName> serverNames = new HashSet<ServerName>();
|
||||
serverNames.add(serverName);
|
||||
this.services.getMasterFileSystem().prepareLogReplay(serverNames);
|
||||
} else {
|
||||
this.services.getMasterFileSystem().splitLog(serverName);
|
||||
}
|
||||
} else {
|
||||
LOG.info("Skipping log splitting for " + serverName);
|
||||
}
|
||||
@ -259,18 +266,6 @@ public class ServerShutdownHandler extends EventHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.shouldSplitHlog && this.distributedLogReplay) {
|
||||
try {
|
||||
LOG.info("Splitting logs for " + serverName
|
||||
+ ". Mark regions in recovery before assignment.");
|
||||
Set<HRegionInfo> toAssignRegionSet = new HashSet<HRegionInfo>();
|
||||
toAssignRegionSet.addAll(toAssignRegions);
|
||||
this.services.getMasterFileSystem().prepareLogReplay(serverName, toAssignRegionSet);
|
||||
} catch (IOException ioe) {
|
||||
resubmit(serverName, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
am.assign(toAssignRegions);
|
||||
|
@ -213,7 +213,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
*/
|
||||
protected enum Operation {
|
||||
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
|
||||
REPLAY_BATCH_MUTATE
|
||||
REPLAY_BATCH_MUTATE, COMPACT_REGION
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
@ -5287,6 +5287,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
case PUT:
|
||||
case DELETE:
|
||||
case BATCH_MUTATE:
|
||||
case COMPACT_REGION:
|
||||
// when a region is in recovering state, no read, split or merge is allowed
|
||||
if (this.isRecovering() && (this.disallowWritesInRecovering ||
|
||||
(op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
|
||||
@ -5296,8 +5297,10 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
|
||||
// split or merge region doesn't need to check the closing/closed state or lock the region
|
||||
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
|
||||
|| op == Operation.COMPACT_REGION) {
|
||||
// split, merge or compact region doesn't need to check the closing/closed state or lock the
|
||||
// region
|
||||
return;
|
||||
}
|
||||
if (this.closing.get()) {
|
||||
|
@ -3368,6 +3368,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||
if (request.hasCompactionState() && request.getCompactionState()) {
|
||||
builder.setCompactionState(region.getCompactionState());
|
||||
}
|
||||
builder.setIsRecovering(region.isRecovering());
|
||||
return builder.build();
|
||||
} catch (IOException ie) {
|
||||
throw new ServiceException(ie);
|
||||
@ -3743,6 +3744,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
region.startRegionOperation(Operation.COMPACT_REGION);
|
||||
LOG.info("Compacting " + region.getRegionNameAsString());
|
||||
boolean major = false;
|
||||
byte [] family = null;
|
||||
@ -4028,7 +4030,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||
cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
|
||||
OperationStatus codes[] = region.batchMutate(mArray);
|
||||
OperationStatus codes[] = region.batchMutate(mArray, isReplay);
|
||||
for (i = 0; i < codes.length; i++) {
|
||||
switch (codes[i].getOperationStatusCode()) {
|
||||
case BAD_FAMILY:
|
||||
|
@ -44,6 +44,7 @@ import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ -74,6 +75,9 @@ import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
|
||||
@ -97,6 +101,7 @@ import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* This class is responsible for splitting up a bunch of regionserver commit log
|
||||
@ -1541,43 +1546,37 @@ public class HLogSplitter {
|
||||
}
|
||||
|
||||
Long lastFlushedSequenceId = -1l;
|
||||
loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
|
||||
Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
|
||||
.getEncodedName());
|
||||
AtomicBoolean isRecovering = new AtomicBoolean(true);
|
||||
loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
|
||||
if (!isRecovering.get()) {
|
||||
// region isn't in recovering at all because WAL file may contain a region that has
|
||||
// been moved to somewhere before hosting RS fails
|
||||
lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
|
||||
LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
|
||||
+ " because it's not in recovering.");
|
||||
} else {
|
||||
Long cachedLastFlushedSequenceId =
|
||||
lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
|
||||
|
||||
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
|
||||
// update the value for the region
|
||||
RegionStoreSequenceIds ids =
|
||||
SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc.getRegionInfo()
|
||||
.getEncodedName());
|
||||
if(ids != null) {
|
||||
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
|
||||
Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
|
||||
List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
|
||||
for (StoreSequenceId id : maxSeqIdInStores) {
|
||||
storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
|
||||
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
|
||||
// update the value for the region
|
||||
RegionStoreSequenceIds ids =
|
||||
SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
|
||||
.getRegionInfo().getEncodedName());
|
||||
if (ids != null) {
|
||||
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
|
||||
Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
|
||||
List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
|
||||
for (StoreSequenceId id : maxSeqIdInStores) {
|
||||
storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
|
||||
}
|
||||
regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
|
||||
}
|
||||
regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
|
||||
}
|
||||
|
||||
if (cachedLastFlushedSequenceId == null
|
||||
|| lastFlushedSequenceId > cachedLastFlushedSequenceId) {
|
||||
lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
|
||||
}
|
||||
|
||||
// check if the region to be recovered is marked as recovering in ZK
|
||||
try {
|
||||
if (SplitLogManager.isRegionMarkedRecoveringInZK(watcher, loc.getRegionInfo()
|
||||
.getEncodedName()) == false) {
|
||||
// region isn't in recovering at all because WAL file may contain a region that has
|
||||
// been moved to somewhere before hosting RS fails
|
||||
lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
|
||||
LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
|
||||
+ " because it's not in recovering.");
|
||||
if (cachedLastFlushedSequenceId == null
|
||||
|| lastFlushedSequenceId > cachedLastFlushedSequenceId) {
|
||||
lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to retrieve recovering state of region "
|
||||
+ loc.getRegionInfo().getEncodedName(), e);
|
||||
}
|
||||
|
||||
onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
|
||||
@ -1608,11 +1607,12 @@ public class HLogSplitter {
|
||||
* @param loc
|
||||
* @param row
|
||||
* @param timeout How long to wait
|
||||
* @param isRecovering Recovering state of the region interested on destination region server.
|
||||
* @return True when region is online on the destination region server
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
|
||||
final long timeout)
|
||||
final long timeout, AtomicBoolean isRecovering)
|
||||
throws IOException {
|
||||
final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
|
||||
final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
@ -1630,8 +1630,16 @@ public class HLogSplitter {
|
||||
}
|
||||
BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
|
||||
HRegionInfo region = loc.getRegionInfo();
|
||||
if((region =ProtobufUtil.getRegionInfo(remoteSvr, region.getRegionName())) != null) {
|
||||
return loc;
|
||||
try {
|
||||
GetRegionInfoRequest request =
|
||||
RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
|
||||
GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
|
||||
if (HRegionInfo.convert(response.getRegionInfo()) != null) {
|
||||
isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
|
||||
return loc;
|
||||
}
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
cause = e.getCause();
|
||||
|
@ -634,7 +634,7 @@ public class TestDistributedLogSplitting {
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testLogReplayForDisablingTable() throws Exception {
|
||||
LOG.info("testLogReplayWithNonMetaRSDown");
|
||||
LOG.info("testLogReplayForDisablingTable");
|
||||
Configuration curConf = HBaseConfiguration.create();
|
||||
curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||
startCluster(NUM_RS, curConf);
|
||||
|
Loading…
x
Reference in New Issue
Block a user