HBase-11094: Distributed log replay is incompatible for rolling restarts

This commit is contained in:
Jeffrey Zhong 2014-06-12 21:50:16 -07:00
parent 9e824cda19
commit 55cecc9679
29 changed files with 802 additions and 258 deletions

View File

@ -1596,7 +1596,7 @@ public final class ProtobufUtil {
public static void openRegion(final AdminService.BlockingInterface admin,
ServerName server, final HRegionInfo region) throws IOException {
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(server, region, -1, null);
RequestConverter.buildOpenRegionRequest(server, region, -1, null, null);
try {
admin.openRegion(null, request);
} catch (ServiceException se) {

View File

@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -702,17 +704,18 @@ public final class RequestConverter {
* Create a protocol buffer OpenRegionRequest to open a list of regions
*
* @param regionOpenInfos info of a list of regions to open
* @param openForReplay
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest
buildOpenRegionRequest(final List<Triple<HRegionInfo, Integer,
List<ServerName>>> regionOpenInfos) {
List<ServerName>>> regionOpenInfos, Boolean openForReplay) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
for (Triple<HRegionInfo, Integer, List<ServerName>> regionOpenInfo: regionOpenInfos) {
Integer second = regionOpenInfo.getSecond();
int versionOfOfflineNode = second == null ? -1 : second.intValue();
builder.addOpenInfo(buildRegionOpenInfo(
regionOpenInfo.getFirst(), versionOfOfflineNode, regionOpenInfo.getThird()));
builder.addOpenInfo(buildRegionOpenInfo(regionOpenInfo.getFirst(), versionOfOfflineNode,
regionOpenInfo.getThird(), openForReplay));
}
return builder.build();
}
@ -724,12 +727,15 @@ public final class RequestConverter {
* @param region the region to open
* @param versionOfOfflineNode that needs to be present in the offline node
* @param favoredNodes
* @param openForReplay
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest buildOpenRegionRequest(ServerName server,
final HRegionInfo region, final int versionOfOfflineNode, List<ServerName> favoredNodes) {
final HRegionInfo region, final int versionOfOfflineNode, List<ServerName> favoredNodes,
Boolean openForReplay) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes));
builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes,
openForReplay));
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
@ -1493,7 +1499,7 @@ public final class RequestConverter {
*/
private static RegionOpenInfo buildRegionOpenInfo(
final HRegionInfo region, final int versionOfOfflineNode,
final List<ServerName> favoredNodes) {
final List<ServerName> favoredNodes, Boolean openForReplay) {
RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
builder.setRegion(HRegionInfo.convert(region));
if (versionOfOfflineNode >= 0) {
@ -1504,6 +1510,9 @@ public final class RequestConverter {
builder.addFavoredNodes(ProtobufUtil.toServerName(server));
}
}
if(openForReplay != null) {
builder.setOpenForDistributedLogReplay(openForReplay);
}
return builder.build();
}
}

View File

@ -4032,6 +4032,24 @@ public final class AdminProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getFavoredNodesOrBuilder(
int index);
// optional bool openForDistributedLogReplay = 4;
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
boolean hasOpenForDistributedLogReplay();
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
boolean getOpenForDistributedLogReplay();
}
/**
* Protobuf type {@code OpenRegionRequest.RegionOpenInfo}
@ -4110,6 +4128,11 @@ public final class AdminProtos {
favoredNodes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.PARSER, extensionRegistry));
break;
}
case 32: {
bitField0_ |= 0x00000004;
openForDistributedLogReplay_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -4227,10 +4250,35 @@ public final class AdminProtos {
return favoredNodes_.get(index);
}
// optional bool openForDistributedLogReplay = 4;
public static final int OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER = 4;
private boolean openForDistributedLogReplay_;
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
public boolean hasOpenForDistributedLogReplay() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
public boolean getOpenForDistributedLogReplay() {
return openForDistributedLogReplay_;
}
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.getDefaultInstance();
versionOfOfflineNode_ = 0;
favoredNodes_ = java.util.Collections.emptyList();
openForDistributedLogReplay_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -4267,6 +4315,9 @@ public final class AdminProtos {
for (int i = 0; i < favoredNodes_.size(); i++) {
output.writeMessage(3, favoredNodes_.get(i));
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(4, openForDistributedLogReplay_);
}
getUnknownFields().writeTo(output);
}
@ -4288,6 +4339,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(3, favoredNodes_.get(i));
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(4, openForDistributedLogReplay_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -4323,6 +4378,11 @@ public final class AdminProtos {
}
result = result && getFavoredNodesList()
.equals(other.getFavoredNodesList());
result = result && (hasOpenForDistributedLogReplay() == other.hasOpenForDistributedLogReplay());
if (hasOpenForDistributedLogReplay()) {
result = result && (getOpenForDistributedLogReplay()
== other.getOpenForDistributedLogReplay());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -4348,6 +4408,10 @@ public final class AdminProtos {
hash = (37 * hash) + FAVORED_NODES_FIELD_NUMBER;
hash = (53 * hash) + getFavoredNodesList().hashCode();
}
if (hasOpenForDistributedLogReplay()) {
hash = (37 * hash) + OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getOpenForDistributedLogReplay());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -4473,6 +4537,8 @@ public final class AdminProtos {
} else {
favoredNodesBuilder_.clear();
}
openForDistributedLogReplay_ = false;
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@ -4522,6 +4588,10 @@ public final class AdminProtos {
} else {
result.favoredNodes_ = favoredNodesBuilder_.build();
}
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000004;
}
result.openForDistributedLogReplay_ = openForDistributedLogReplay_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -4570,6 +4640,9 @@ public final class AdminProtos {
}
}
}
if (other.hasOpenForDistributedLogReplay()) {
setOpenForDistributedLogReplay(other.getOpenForDistributedLogReplay());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -5001,6 +5074,55 @@ public final class AdminProtos {
return favoredNodesBuilder_;
}
// optional bool openForDistributedLogReplay = 4;
private boolean openForDistributedLogReplay_ ;
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
public boolean hasOpenForDistributedLogReplay() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
public boolean getOpenForDistributedLogReplay() {
return openForDistributedLogReplay_;
}
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
public Builder setOpenForDistributedLogReplay(boolean value) {
bitField0_ |= 0x00000008;
openForDistributedLogReplay_ = value;
onChanged();
return this;
}
/**
* <code>optional bool openForDistributedLogReplay = 4;</code>
*
* <pre>
* open region for distributedLogReplay
* </pre>
*/
public Builder clearOpenForDistributedLogReplay() {
bitField0_ = (bitField0_ & ~0x00000008);
openForDistributedLogReplay_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:OpenRegionRequest.RegionOpenInfo)
}
@ -21166,77 +21288,78 @@ public final class AdminProtos {
"FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" +
"nlineRegionRequest\";\n\027GetOnlineRegionRes" +
"ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" +
"\326\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
"\374\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
"2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" +
"erverStartCode\030\002 \001(\004\032r\n\016RegionOpenInfo\022\033" +
"\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_o" +
"f_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 " +
"\003(\0132\013.ServerName\"\235\001\n\022OpenRegionResponse\022",
"=\n\ropening_state\030\001 \003(\0162&.OpenRegionRespo" +
"nse.RegionOpeningState\"H\n\022RegionOpeningS" +
"tate\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016" +
"FAILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022" +
" \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027ver" +
"sion_of_closing_node\030\002 \001(\r\022\036\n\020transition" +
"_in_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server" +
"\030\004 \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005" +
" \001(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 " +
"\002(\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(",
"\0132\020.RegionSpecifier\022\030\n\020if_older_than_ts\030" +
"\002 \001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flu" +
"sh_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitR" +
"egionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
"cifier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegi" +
"onResponse\"W\n\024CompactRegionRequest\022 \n\006re" +
"gion\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 " +
"\001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionResp" +
"onse\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013up" +
"date_info\030\001 \003(\0132+.UpdateFavoredNodesRequ",
"est.RegionUpdateInfo\032S\n\020RegionUpdateInfo" +
"\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored" +
"_nodes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavor" +
"edNodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Mer" +
"geRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Reg" +
"ionSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionS" +
"pecifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Mer" +
"geRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002" +
"(\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025" +
"associated_cell_count\030\003 \001(\005\"4\n\030Replicate",
"WALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntr" +
"y\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWA" +
"LWriterRequest\"0\n\025RollWALWriterResponse\022" +
"\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRe" +
"quest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespo" +
"nse\"\026\n\024GetServerInfoRequest\"B\n\nServerInf" +
"o\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nw" +
"ebui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse" +
"\022 \n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014A" +
"dminService\022>\n\rGetRegionInfo\022\025.GetRegion",
"InfoRequest\032\026.GetRegionInfoResponse\022;\n\014G" +
"etStoreFile\022\024.GetStoreFileRequest\032\025.GetS" +
"toreFileResponse\022D\n\017GetOnlineRegion\022\027.Ge" +
"tOnlineRegionRequest\032\030.GetOnlineRegionRe" +
"sponse\0225\n\nOpenRegion\022\022.OpenRegionRequest" +
"\032\023.OpenRegionResponse\0228\n\013CloseRegion\022\023.C" +
"loseRegionRequest\032\024.CloseRegionResponse\022" +
"8\n\013FlushRegion\022\023.FlushRegionRequest\032\024.Fl" +
"ushRegionResponse\0228\n\013SplitRegion\022\023.Split" +
"RegionRequest\032\024.SplitRegionResponse\022>\n\rC",
"ompactRegion\022\025.CompactRegionRequest\032\026.Co" +
"mpactRegionResponse\022;\n\014MergeRegions\022\024.Me" +
"rgeRegionsRequest\032\025.MergeRegionsResponse" +
"\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEntr" +
"yRequest\032\032.ReplicateWALEntryResponse\022?\n\006" +
"Replay\022\031.ReplicateWALEntryRequest\032\032.Repl" +
"icateWALEntryResponse\022>\n\rRollWALWriter\022\025" +
".RollWALWriterRequest\032\026.RollWALWriterRes" +
"ponse\022>\n\rGetServerInfo\022\025.GetServerInfoRe" +
"quest\032\026.GetServerInfoResponse\0225\n\nStopSer",
"ver\022\022.StopServerRequest\032\023.StopServerResp" +
"onse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavor" +
"edNodesRequest\032\033.UpdateFavoredNodesRespo" +
"nseBA\n*org.apache.hadoop.hbase.protobuf." +
"generatedB\013AdminProtosH\001\210\001\001\240\001\001"
"erverStartCode\030\002 \001(\004\032\227\001\n\016RegionOpenInfo\022" +
"\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_" +
"of_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003" +
" \003(\0132\013.ServerName\022#\n\033openForDistributedL",
"ogReplay\030\004 \001(\010\"\235\001\n\022OpenRegionResponse\022=\n" +
"\ropening_state\030\001 \003(\0162&.OpenRegionRespons" +
"e.RegionOpeningState\"H\n\022RegionOpeningSta" +
"te\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FA" +
"ILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022 \n" +
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027versi" +
"on_of_closing_node\030\002 \001(\r\022\036\n\020transition_i" +
"n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004" +
" \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" +
"(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(",
"\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
"\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " +
"\001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flush" +
"_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitReg" +
"ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" +
"fier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegion" +
"Response\"W\n\024CompactRegionRequest\022 \n\006regi" +
"on\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 \001(" +
"\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionRespon" +
"se\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013upda",
"te_info\030\001 \003(\0132+.UpdateFavoredNodesReques" +
"t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" +
"\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" +
"odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" +
"NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" +
"RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" +
"nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" +
"cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" +
"RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" +
"2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
"sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" +
"LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" +
"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" +
"riterRequest\"0\n\025RollWALWriterResponse\022\027\n" +
"\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" +
"est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" +
"e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" +
" \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" +
"ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " +
"\n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014Adm",
"inService\022>\n\rGetRegionInfo\022\025.GetRegionIn" +
"foRequest\032\026.GetRegionInfoResponse\022;\n\014Get" +
"StoreFile\022\024.GetStoreFileRequest\032\025.GetSto" +
"reFileResponse\022D\n\017GetOnlineRegion\022\027.GetO" +
"nlineRegionRequest\032\030.GetOnlineRegionResp" +
"onse\0225\n\nOpenRegion\022\022.OpenRegionRequest\032\023" +
".OpenRegionResponse\0228\n\013CloseRegion\022\023.Clo" +
"seRegionRequest\032\024.CloseRegionResponse\0228\n" +
"\013FlushRegion\022\023.FlushRegionRequest\032\024.Flus" +
"hRegionResponse\0228\n\013SplitRegion\022\023.SplitRe",
"gionRequest\032\024.SplitRegionResponse\022>\n\rCom" +
"pactRegion\022\025.CompactRegionRequest\032\026.Comp" +
"actRegionResponse\022;\n\014MergeRegions\022\024.Merg" +
"eRegionsRequest\032\025.MergeRegionsResponse\022J" +
"\n\021ReplicateWALEntry\022\031.ReplicateWALEntryR" +
"equest\032\032.ReplicateWALEntryResponse\022?\n\006Re" +
"play\022\031.ReplicateWALEntryRequest\032\032.Replic" +
"ateWALEntryResponse\022>\n\rRollWALWriter\022\025.R" +
"ollWALWriterRequest\032\026.RollWALWriterRespo" +
"nse\022>\n\rGetServerInfo\022\025.GetServerInfoRequ",
"est\032\026.GetServerInfoResponse\0225\n\nStopServe" +
"r\022\022.StopServerRequest\032\023.StopServerRespon" +
"se\022M\n\022UpdateFavoredNodes\022\032.UpdateFavored" +
"NodesRequest\032\033.UpdateFavoredNodesRespons" +
"eBA\n*org.apache.hadoop.hbase.protobuf.ge" +
"neratedB\013AdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -21290,7 +21413,7 @@ public final class AdminProtos {
internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_OpenRegionRequest_RegionOpenInfo_descriptor,
new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", });
new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", "OpenForDistributedLogReplay", });
internal_static_OpenRegionResponse_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_OpenRegionResponse_fieldAccessorTable = new

View File

@ -3230,6 +3230,16 @@ public final class ZooKeeperProtos {
* <code>required .ServerName server_name = 2;</code>
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder();
// optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
boolean hasMode();
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode();
}
/**
* Protobuf type {@code SplitLogTask}
@ -3312,6 +3322,17 @@ public final class ZooKeeperProtos {
bitField0_ |= 0x00000002;
break;
}
case 24: {
int rawValue = input.readEnum();
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.valueOf(rawValue);
if (value == null) {
unknownFields.mergeVarintField(3, rawValue);
} else {
bitField0_ |= 0x00000004;
mode_ = value;
}
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -3460,6 +3481,97 @@ public final class ZooKeeperProtos {
// @@protoc_insertion_point(enum_scope:SplitLogTask.State)
}
/**
* Protobuf enum {@code SplitLogTask.RecoveryMode}
*/
public enum RecoveryMode
implements com.google.protobuf.ProtocolMessageEnum {
/**
* <code>UNKNOWN = 0;</code>
*/
UNKNOWN(0, 0),
/**
* <code>LOG_SPLITTING = 1;</code>
*/
LOG_SPLITTING(1, 1),
/**
* <code>LOG_REPLAY = 2;</code>
*/
LOG_REPLAY(2, 2),
;
/**
* <code>UNKNOWN = 0;</code>
*/
public static final int UNKNOWN_VALUE = 0;
/**
* <code>LOG_SPLITTING = 1;</code>
*/
public static final int LOG_SPLITTING_VALUE = 1;
/**
* <code>LOG_REPLAY = 2;</code>
*/
public static final int LOG_REPLAY_VALUE = 2;
public final int getNumber() { return value; }
public static RecoveryMode valueOf(int value) {
switch (value) {
case 0: return UNKNOWN;
case 1: return LOG_SPLITTING;
case 2: return LOG_REPLAY;
default: return null;
}
}
public static com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>
internalGetValueMap() {
return internalValueMap;
}
private static com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>
internalValueMap =
new com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>() {
public RecoveryMode findValueByNumber(int number) {
return RecoveryMode.valueOf(number);
}
};
public final com.google.protobuf.Descriptors.EnumValueDescriptor
getValueDescriptor() {
return getDescriptor().getValues().get(index);
}
public final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptorForType() {
return getDescriptor();
}
public static final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor().getEnumTypes().get(1);
}
private static final RecoveryMode[] VALUES = values();
public static RecoveryMode valueOf(
com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
if (desc.getType() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"EnumValueDescriptor is not for this type.");
}
return VALUES[desc.getIndex()];
}
private final int index;
private final int value;
private RecoveryMode(int index, int value) {
this.index = index;
this.value = value;
}
// @@protoc_insertion_point(enum_scope:SplitLogTask.RecoveryMode)
}
private int bitField0_;
// required .SplitLogTask.State state = 1;
public static final int STATE_FIELD_NUMBER = 1;
@ -3499,9 +3611,26 @@ public final class ZooKeeperProtos {
return serverName_;
}
// optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
public static final int MODE_FIELD_NUMBER = 3;
private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_;
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
public boolean hasMode() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
return mode_;
}
private void initFields() {
state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -3533,6 +3662,9 @@ public final class ZooKeeperProtos {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, serverName_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeEnum(3, mode_.getNumber());
}
getUnknownFields().writeTo(output);
}
@ -3550,6 +3682,10 @@ public final class ZooKeeperProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, serverName_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeEnumSize(3, mode_.getNumber());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -3583,6 +3719,11 @@ public final class ZooKeeperProtos {
result = result && getServerName()
.equals(other.getServerName());
}
result = result && (hasMode() == other.hasMode());
if (hasMode()) {
result = result &&
(getMode() == other.getMode());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -3604,6 +3745,10 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + SERVER_NAME_FIELD_NUMBER;
hash = (53 * hash) + getServerName().hashCode();
}
if (hasMode()) {
hash = (37 * hash) + MODE_FIELD_NUMBER;
hash = (53 * hash) + hashEnum(getMode());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -3728,6 +3873,8 @@ public final class ZooKeeperProtos {
serverNameBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -3768,6 +3915,10 @@ public final class ZooKeeperProtos {
} else {
result.serverName_ = serverNameBuilder_.build();
}
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.mode_ = mode_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -3790,6 +3941,9 @@ public final class ZooKeeperProtos {
if (other.hasServerName()) {
mergeServerName(other.getServerName());
}
if (other.hasMode()) {
setMode(other.getMode());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -3982,6 +4136,42 @@ public final class ZooKeeperProtos {
return serverNameBuilder_;
}
// optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
public boolean hasMode() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
return mode_;
}
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
public Builder setMode(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000004;
mode_ = value;
onChanged();
return this;
}
/**
* <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
*/
public Builder clearMode() {
bitField0_ = (bitField0_ & ~0x00000004);
mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:SplitLogTask)
}
@ -9399,29 +9589,32 @@ public final class ZooKeeperProtos {
"gionTransition\022\027\n\017event_type_code\030\001 \002(\r\022" +
"\023\n\013region_name\030\002 \002(\014\022\023\n\013create_time\030\003 \002(" +
"\004\022 \n\013server_name\030\004 \002(\0132\013.ServerName\022\017\n\007p" +
"ayload\030\005 \001(\014\"\231\001\n\014SplitLogTask\022\"\n\005state\030\001" +
"ayload\030\005 \001(\014\"\214\002\n\014SplitLogTask\022\"\n\005state\030\001" +
" \002(\0162\023.SplitLogTask.State\022 \n\013server_name",
"\030\002 \002(\0132\013.ServerName\"C\n\005State\022\016\n\nUNASSIGN" +
"ED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022" +
"\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table" +
".State:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n" +
"\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003" +
"\"%\n\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"" +
"^\n\020ReplicationState\022&\n\005state\030\001 \002(\0162\027.Rep" +
"licationState.State\"\"\n\005State\022\013\n\007ENABLED\020" +
"\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPositi" +
"on\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationLock\022",
"\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntab" +
"le_name\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030" +
"\002 \001(\0132\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n" +
"\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013cre" +
"ate_time\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013fam" +
"ily_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026R" +
"egionStoreSequenceIds\022 \n\030last_flushed_se" +
"quence_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003" +
"(\0132\020.StoreSequenceIdBE\n*org.apache.hadoo" +
"p.hbase.protobuf.generatedB\017ZooKeeperPro",
"tosH\001\210\001\001\240\001\001"
"\030\002 \002(\0132\013.ServerName\0221\n\004mode\030\003 \001(\0162\032.Spli" +
"tLogTask.RecoveryMode:\007UNKNOWN\"C\n\005State\022" +
"\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002" +
"\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\">\n\014RecoveryMode\022\013\n\007U" +
"NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" +
"Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" +
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017R" +
"eplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Re" +
"plicationState\022&\n\005state\030\001 \002(\0162\027.Replicat",
"ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
"DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
"\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo" +
"ck_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_na" +
"me\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002 \001(\013" +
"2\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_s" +
"hared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_t" +
"ime\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013family_n" +
"ame\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Region" +
"StoreSequenceIds\022 \n\030last_flushed_sequenc",
"e_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020." +
"StoreSequenceIdBE\n*org.apache.hadoop.hba" +
"se.protobuf.generatedB\017ZooKeeperProtosH\001" +
"\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -9457,7 +9650,7 @@ public final class ZooKeeperProtos {
internal_static_SplitLogTask_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SplitLogTask_descriptor,
new java.lang.String[] { "State", "ServerName", });
new java.lang.String[] { "State", "ServerName", "Mode", });
internal_static_Table_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_Table_fieldAccessorTable = new

View File

@ -75,6 +75,8 @@ message OpenRegionRequest {
required RegionInfo region = 1;
optional uint32 version_of_offline_node = 2;
repeated ServerName favored_nodes = 3;
// open region for distributedLogReplay
optional bool openForDistributedLogReplay = 4;
}
}

View File

@ -85,8 +85,14 @@ message SplitLogTask {
DONE = 3;
ERR = 4;
}
enum RecoveryMode {
UNKNOWN = 0;
LOG_SPLITTING = 1;
LOG_REPLAY = 2;
}
required State state = 1;
required ServerName server_name = 2;
optional RecoveryMode mode = 3 [default = UNKNOWN];
}
/**

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.InvalidProtocolBufferException;
@ -36,49 +39,59 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class SplitLogTask {
private final ServerName originServer;
private final ZooKeeperProtos.SplitLogTask.State state;
private final ZooKeeperProtos.SplitLogTask.RecoveryMode mode;
public static class Unassigned extends SplitLogTask {
public Unassigned(final ServerName originServer) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED);
public Unassigned(final ServerName originServer, final RecoveryMode mode) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED, mode);
}
}
public static class Owned extends SplitLogTask {
public Owned(final ServerName originServer) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED);
public Owned(final ServerName originServer, final RecoveryMode mode) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED, mode);
}
}
public static class Resigned extends SplitLogTask {
public Resigned(final ServerName originServer) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED);
public Resigned(final ServerName originServer, final RecoveryMode mode) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED, mode);
}
}
public static class Done extends SplitLogTask {
public Done(final ServerName originServer) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE);
public Done(final ServerName originServer, final RecoveryMode mode) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE, mode);
}
}
public static class Err extends SplitLogTask {
public Err(final ServerName originServer) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR);
public Err(final ServerName originServer, final RecoveryMode mode) {
super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR, mode);
}
}
SplitLogTask(final ZooKeeperProtos.SplitLogTask slt) {
this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState());
this.originServer = ProtobufUtil.toServerName(slt.getServerName());
this.state = slt.getState();
this.mode = (slt.hasMode()) ? slt.getMode() :
ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
}
SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) {
SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state,
final ZooKeeperProtos.SplitLogTask.RecoveryMode mode) {
this.originServer = originServer;
this.state = state;
this.mode = mode;
}
public ServerName getServerName() {
return this.originServer;
}
public ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
return this.mode;
}
public boolean isUnassigned(final ServerName sn) {
return this.originServer.equals(sn) && isUnassigned();
@ -167,7 +180,8 @@ public class SplitLogTask {
// pbs just created.
HBaseProtos.ServerName snpb = ProtobufUtil.toServerName(this.originServer);
ZooKeeperProtos.SplitLogTask slts =
ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).build();
ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).
setMode(this.mode).build();
return ProtobufUtil.prependPBMagic(slts.toByteArray());
}
}

View File

@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
@ -711,7 +712,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
enableMeta(TableName.META_TABLE_NAME);
if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) {
if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
&& (!previouslyFailedMetaRSs.isEmpty())) {
// replay WAL edits mode need new hbase:meta RS is assigned firstly
status.setStatus("replaying log for Meta Region");
this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
@ -740,7 +742,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
if (this.distributedLogReplay) {
if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
// In log replay mode, we mark hbase:meta region as recovering in ZK
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
@ -122,14 +123,18 @@ public class MasterFileSystem {
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
// make sure the fs has the same conf
fs.setConf(conf);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
// setup the filesystem variable
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();
HFileSystem.addLocationsOrderInterceptor(conf);
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
master.getConfiguration(), master, services,
master.getServerName());
try {
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
master.getConfiguration(), master, services,
master.getServerName());
} catch (KeeperException e) {
throw new IOException(e);
}
this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
}
/**
@ -682,4 +687,22 @@ public class MasterFileSystem {
}
return null;
}
/**
* The function is used in SSH to set recovery mode based on configuration after all outstanding
* log split tasks drained.
* @throws KeeperException
* @throws InterruptedIOException
*/
public void setLogRecoveryMode() throws IOException {
try {
this.splitLogManager.setRecoveryMode(false);
} catch (KeeperException e) {
throw new IOException(e);
}
}
public RecoveryMode getLogRecoveryMode() {
return this.splitLogManager.getRecoveryMode();
}
}

View File

@ -60,8 +60,10 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -728,8 +730,9 @@ public class ServerManager {
" failed because no RPC connection found to this server");
return RegionOpeningState.FAILED_OPENING;
}
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(server, region, versionOfOfflineNode, favoredNodes);
OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
region, versionOfOfflineNode, favoredNodes,
(RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningState(response);
@ -757,8 +760,8 @@ public class ServerManager {
return null;
}
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(regionOpenInfos);
OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(regionOpenInfos,
(RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningStateList(response);

View File

@ -46,16 +46,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
@ -138,7 +141,8 @@ public class SplitLogManager extends ZooKeeperListener {
*/
protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
final boolean distributedLogReplay;
private volatile RecoveryMode recoveryMode;
private volatile boolean isDrainingDone = false;
private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
private TimeoutMonitor timeoutMonitor;
@ -160,9 +164,12 @@ public class SplitLogManager extends ZooKeeperListener {
* @param stopper the stoppable in case anything is wrong
* @param master the master services
* @param serverName the master server name
* @throws KeeperException
* @throws InterruptedIOException
*/
public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
Stoppable stopper, MasterServices master, ServerName serverName) {
Stoppable stopper, MasterServices master, ServerName serverName)
throws InterruptedIOException, KeeperException {
this(zkw, conf, stopper, master, serverName, new TaskFinisher() {
@Override
public Status finish(ServerName workerName, String logfile) {
@ -178,20 +185,20 @@ public class SplitLogManager extends ZooKeeperListener {
}
/**
* Its OK to construct this object even when region-servers are not online. It
* does lookup the orphan tasks in zk but it doesn't block waiting for them
* to be done.
*
* Its OK to construct this object even when region-servers are not online. It does lookup the
* orphan tasks in zk but it doesn't block waiting for them to be done.
* @param zkw the ZK watcher
* @param conf the HBase configuration
* @param stopper the stoppable in case anything is wrong
* @param master the master services
* @param serverName the master server name
* @param tf task finisher
* @throws KeeperException
* @throws InterruptedIOException
*/
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
Stoppable stopper, MasterServices master,
ServerName serverName, TaskFinisher tf) {
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper,
MasterServices master, ServerName serverName, TaskFinisher tf) throws InterruptedIOException,
KeeperException {
super(zkw);
this.taskFinisher = tf;
this.conf = conf;
@ -202,9 +209,12 @@ public class SplitLogManager extends ZooKeeperListener {
this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
this.unassignedTimeout =
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
// Determine recovery mode
setRecoveryMode(true);
LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
", distributedLogReplay=" + this.distributedLogReplay);
", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
this.serverName = serverName;
this.timeoutMonitor = new TimeoutMonitor(
@ -465,8 +475,7 @@ public class SplitLogManager extends ZooKeeperListener {
*/
private void
removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
if (!this.distributedLogReplay) {
if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
// the function is only used in WALEdit direct replay mode
return;
}
@ -494,7 +503,7 @@ public class SplitLogManager extends ZooKeeperListener {
if (count == 0 && this.master.isInitialized()
&& !this.master.getServerManager().areDeadServersInProgress()) {
// no splitting work items left
deleteRecoveringRegionZNodes(null);
deleteRecoveringRegionZNodes(watcher, null);
// reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
// this point.
lastRecoveringNodeCreationTime = Long.MAX_VALUE;
@ -550,14 +559,6 @@ public class SplitLogManager extends ZooKeeperListener {
void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
throws KeeperException, InterruptedIOException {
if (!this.distributedLogReplay) {
// remove any regions in recovery from ZK which could happen when we turn the feature on
// and later turn it off
ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
// the function is only used in distributedLogReplay mode when master is in initialization
return;
}
Set<String> knownFailedServers = new HashSet<String>();
if (failedServers != null) {
for (ServerName tmpServerName : failedServers) {
@ -625,7 +626,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
private void deleteRecoveringRegionZNodes(List<String> regions) {
public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
try {
if (regions == null) {
// remove all children under /home/recovering-regions
@ -683,7 +684,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
private void createNode(String path, Long retry_count) {
SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
return;
@ -858,7 +859,7 @@ public class SplitLogManager extends ZooKeeperListener {
task.incarnation++;
try {
// blocking zk call but this is done from the timeout thread
SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
LOG.debug("failed to resubmit task " + path +
" version changed");
@ -951,7 +952,7 @@ public class SplitLogManager extends ZooKeeperListener {
// Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
// therefore this behavior is safe.
lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
SplitLogTask slt = new SplitLogTask.Done(this.serverName);
SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
this.watcher.getRecoverableZooKeeper().getZooKeeper().
create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
@ -1099,7 +1100,7 @@ public class SplitLogManager extends ZooKeeperListener {
*/
void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
throws KeeperException, InterruptedIOException {
if (userRegions == null || !this.distributedLogReplay) {
if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
return;
}
@ -1242,6 +1243,111 @@ public class SplitLogManager extends ZooKeeperListener {
}
return result;
}
/**
* This function is to set recovery mode from outstanding split log tasks from before or
* current configuration setting
* @param isForInitialization
* @throws KeeperException
* @throws InterruptedIOException
*/
public void setRecoveryMode(boolean isForInitialization) throws KeeperException,
InterruptedIOException {
if(this.isDrainingDone) {
// when there is no outstanding splitlogtask after master start up, we already have up to date
// recovery mode
return;
}
if(this.watcher == null) {
// when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
this.isDrainingDone = true;
this.recoveryMode = RecoveryMode.LOG_SPLITTING;
return;
}
boolean hasSplitLogTask = false;
boolean hasRecoveringRegions = false;
RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
// Firstly check if there are outstanding recovering regions
List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
if (regions != null && !regions.isEmpty()) {
hasRecoveringRegions = true;
previousRecoveryMode = RecoveryMode.LOG_REPLAY;
}
if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
// Secondly check if there are outstanding split log task
List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
if (tasks != null && !tasks.isEmpty()) {
hasSplitLogTask = true;
if (isForInitialization) {
// during initialization, try to get recovery mode from splitlogtask
for (String task : tasks) {
try {
byte[] data = ZKUtil.getData(this.watcher,
ZKUtil.joinZNode(watcher.splitLogZNode, task));
if (data == null) continue;
SplitLogTask slt = SplitLogTask.parseFrom(data);
previousRecoveryMode = slt.getMode();
if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
// created by old code base where we don't set recovery mode in splitlogtask
// we can safely set to LOG_SPLITTING because we're in master initialization code
// before SSH is enabled & there is no outstanding recovering regions
previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
}
break;
} catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + task, e);
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
}
}
}
synchronized(this) {
if(this.isDrainingDone) {
return;
}
if (!hasSplitLogTask && !hasRecoveringRegions) {
this.isDrainingDone = true;
this.recoveryMode = recoveryModeInConfig;
return;
} else if (!isForInitialization) {
// splitlogtask hasn't drained yet, keep existing recovery mode
return;
}
if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
this.recoveryMode = previousRecoveryMode;
} else {
this.recoveryMode = recoveryModeInConfig;
}
}
}
public RecoveryMode getRecoveryMode() {
return this.recoveryMode;
}
/**
* Returns if distributed log replay is turned on or not
* @param conf
* @return true when distributed log replay is turned on
*/
private boolean isDistributedLogReplay(Configuration conf) {
boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
if (LOG.isDebugEnabled()) {
LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
}
// For distributed log replay, hfile version must be 3 at least; we need tag support.
return dlr && (version >= 3);
}
/**
* Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
@ -62,10 +63,13 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
boolean gotException = true;
try {
AssignmentManager am = this.services.getAssignmentManager();
this.services.getMasterFileSystem().setLogRecoveryMode();
boolean distributedLogReplay =
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
if (this.shouldSplitHlog) {
LOG.info("Splitting hbase:meta logs for " + serverName);
if (this.distributedLogReplay) {
if (distributedLogReplay) {
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
@ -97,7 +101,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
}
try {
if (this.shouldSplitHlog && this.distributedLogReplay) {
if (this.shouldSplitHlog && distributedLogReplay) {
if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
regionAssignmentWaitTimeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@ -64,7 +65,6 @@ public class ServerShutdownHandler extends EventHandler {
protected final MasterServices services;
protected final DeadServer deadServers;
protected final boolean shouldSplitHlog; // whether to split HLog or not
protected final boolean distributedLogReplay;
protected final int regionAssignmentWaitTimeout;
public ServerShutdownHandler(final Server server, final MasterServices services,
@ -86,7 +86,6 @@ public class ServerShutdownHandler extends EventHandler {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
this.shouldSplitHlog = shouldSplitHlog;
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(server.getConfiguration());
this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
}
@ -183,10 +182,16 @@ public class ServerShutdownHandler extends EventHandler {
throw new IOException("Server is stopped");
}
// delayed to set recovery mode based on configuration only after all outstanding splitlogtask
// drained
this.services.getMasterFileSystem().setLogRecoveryMode();
boolean distributedLogReplay =
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
if (this.shouldSplitHlog) {
LOG.info("Splitting logs for " + serverName + " before assignment.");
if (this.distributedLogReplay) {
if (distributedLogReplay) {
LOG.info("Mark regions in recovery before assignment.");
Set<ServerName> serverNames = new HashSet<ServerName>();
serverNames.add(serverName);
@ -286,7 +291,7 @@ public class ServerShutdownHandler extends EventHandler {
throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
}
if (this.shouldSplitHlog && this.distributedLogReplay) {
if (this.shouldSplitHlog && distributedLogReplay) {
// wait for region assignment completes
for (HRegionInfo hri : toAssignRegions) {
try {

View File

@ -364,9 +364,6 @@ public class HRegionServer extends HasThread implements
private RegionServerProcedureManagerHost rspmHost;
// configuration setting on if replay WAL edits directly to another RS
protected final boolean distributedLogReplay;
// Table level lock manager for locking for region operations
protected TableLockManager tableLockManager;
@ -447,7 +444,6 @@ public class HRegionServer extends HasThread implements
this.startcode = System.currentTimeMillis();
String hostName = rpcServices.isa.getHostName();
serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
// login the zookeeper client principal (if using security)
ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
@ -631,9 +627,7 @@ public class HRegionServer extends HasThread implements
this.abort("Failed to reach zk cluster when creating procedure handler.", e);
}
// register watcher for recovering regions
if(this.distributedLogReplay) {
this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
}
this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
}
/**

View File

@ -1182,6 +1182,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
+ regionServer.serverName));
}
}
OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
final int regionCount = request.getOpenInfoCount();
final Map<TableName, HTableDescriptor> htds =
@ -1258,10 +1259,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (previous == null) {
// check if the region to be opened is marked in recovering state in ZK
if (regionServer.distributedLogReplay
&& SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
region.getEncodedName())) {
regionServer.recoveringRegions.put(region.getEncodedName(), null);
if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
region.getEncodedName())) {
// check if current region open is for distributedLogReplay. This check is to support
// rolling restart/upgrade where we want to Master/RS see same configuration
if (regionOpenInfo.hasOpenForDistributedLogReplay()
&& regionOpenInfo.getOpenForDistributedLogReplay()) {
regionServer.recoveringRegions.put(region.getEncodedName(), null);
} else {
// remove stale recovery region from ZK when we open region not for recovering which
// could happen when turn distributedLogReplay off from on.
List<String> tmpRegions = new ArrayList<String>();
tmpRegions.add(region.getEncodedName());
SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions);
}
}
// If there is no action in progress, we can submit a specific handler.
// Need to pass the expected version in the constructor.

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
@ -44,6 +45,8 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
@ -125,7 +128,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
this(watcher, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, CancelableProgressable p) {
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
Path rootdir;
FileSystem fs;
try {
@ -140,7 +143,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// encountered a bad non-retry-able persistent error.
try {
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager())) {
fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@ -174,11 +177,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
try {
LOG.info("SplitLogWorker " + this.serverName + " starting");
this.watcher.registerListener(this);
boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
if (distributedLogReplay) {
// initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);
}
// pre-initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);
// wait for master to create the splitLogZnode
int res = -1;
@ -301,7 +301,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
*/
private void grabTask(String path) {
Stat stat = new Stat();
long t = -1;
byte[] data;
synchronized (grabTaskLock) {
currentTask = path;
@ -334,14 +333,15 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
return;
}
currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion());
currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(),
stat.getVersion());
if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName),
HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
return;
}
@ -350,7 +350,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
submitTask(path, currentVersion, this.report_period);
submitTask(path, slt.getMode(), currentVersion, this.report_period);
// after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
try {
@ -385,10 +385,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* @return non-negative integer value when task can be owned by current region server otherwise -1
*/
protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
ServerName server, String task, int taskZKVersion) {
ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
int latestZKVersion = FAILED_TO_OWN_TASK;
try {
SplitLogTask slt = new SplitLogTask.Owned(server);
SplitLogTask slt = new SplitLogTask.Owned(server, mode);
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + task);
@ -445,7 +445,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* @param curTask
* @param curTaskZKVersion
*/
void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
final int reportPeriod) {
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
CancelableProgressable reporter = new CancelableProgressable() {
@ -456,8 +457,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
long t = EnvironmentEdgeManager.currentTimeMillis();
if ((t - last_report_at) > reportPeriod) {
last_report_at = t;
int latestZKVersion =
attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue());
int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode,
zkVersion.intValue());
if (latestZKVersion < 0) {
LOG.warn("Failed to heartbeat the task" + curTask);
return false;
@ -468,9 +469,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
};
HLogSplitterHandler hsh =
new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress,
this.splitTaskExecutor);
HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter,
this.tasksInProgress, this.splitTaskExecutor, mode);
this.executorService.submit(hsh);
}
@ -640,6 +640,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
RESIGNED(),
PREEMPTED()
}
Status exec(String name, CancelableProgressable p);
Status exec(String name, RecoveryMode mode, CancelableProgressable p);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@ -55,11 +56,12 @@ public class HLogSplitterHandler extends EventHandler {
private final AtomicInteger inProgressTasks;
private final MutableInt curTaskZKVersion;
private final TaskExecutor splitTaskExecutor;
private final RecoveryMode mode;
public HLogSplitterHandler(final Server server, String curTask,
final MutableInt curTaskZKVersion,
CancelableProgressable reporter,
AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
super(server, EventType.RS_LOG_REPLAY);
this.curTask = curTask;
this.wal = ZKSplitLog.getFileName(curTask);
@ -70,16 +72,17 @@ public class HLogSplitterHandler extends EventHandler {
this.zkw = server.getZooKeeper();
this.curTaskZKVersion = curTaskZKVersion;
this.splitTaskExecutor = splitTaskExecutor;
this.mode = mode;
}
@Override
public void process() throws IOException {
long startTime = System.currentTimeMillis();
try {
Status status = this.splitTaskExecutor.exec(wal, reporter);
Status status = this.splitTaskExecutor.exec(wal, mode, reporter);
switch (status) {
case DONE:
endTask(zkw, new SplitLogTask.Done(this.serverName),
endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
break;
case PREEMPTED:
@ -88,7 +91,7 @@ public class HLogSplitterHandler extends EventHandler {
break;
case ERR:
if (server != null && !server.isStopped()) {
endTask(zkw, new SplitLogTask.Err(this.serverName),
endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
break;
}
@ -99,7 +102,7 @@ public class HLogSplitterHandler extends EventHandler {
if (server != null && server.isStopped()) {
LOG.info("task execution interrupted because worker is exiting " + curTask);
}
endTask(zkw, new SplitLogTask.Resigned(this.serverName),
endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
break;
}

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
@ -172,7 +173,7 @@ public class HLogSplitter {
HLogSplitter(Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
CoordinatedStateManager csm) {
CoordinatedStateManager csm, RecoveryMode mode) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@ -190,7 +191,7 @@ public class HLogSplitter {
// a larger minBatchSize may slow down recovery because replay writer has to wait for
// enough edits before replaying them
this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (zkw != null && csm != null && this.distributedLogReplay) {
@ -224,9 +225,8 @@ public class HLogSplitter {
*/
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
ZooKeeperWatcher zkw, CoordinatedStateManager cp) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw,
cp);
ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode);
return s.splitLogFile(logfile, reporter);
}
@ -240,7 +240,8 @@ public class HLogSplitter {
List<Path> splits = new ArrayList<Path>();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null);
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
@ -1980,20 +1981,4 @@ public class HLogSplitter {
return mutations;
}
/**
* Returns if distributed log replay is turned on or not
* @param conf
* @return true when distributed log replay is turned on
*/
public static boolean isDistributedLogReplay(Configuration conf) {
boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
if (LOG.isDebugEnabled()) {
LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
}
// For distributed log replay, hfile version must be 3 at least; we need tag support.
return dlr && (version >= 3);
}
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.DataInputBuffer;
@ -120,7 +121,8 @@ public class TestSerialization {
@Test
public void testSplitLogTask() throws DeserializationException {
SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"));
SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
RecoveryMode.LOG_REPLAY);
byte [] bytes = slt.toByteArray();
SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
assertTrue(slt.equals(sltDeserialized));

View File

@ -68,7 +68,7 @@ public class TestMultiParallel {
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte [][] KEYS = makeKeys();
private static final int slaves = 3; // also used for testing HTable pool size
private static final int slaves = 5; // also used for testing HTable pool size
@BeforeClass public static void beforeClass() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@ -692,4 +692,4 @@ public class TestMultiParallel {
validateEmpty(result);
}
}
}
}

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -660,10 +661,14 @@ public class TestAssignmentManager {
DeadServer deadServers = new DeadServer();
deadServers.add(SERVERNAME_A);
// I need a services instance that will return the AM
MasterFileSystem fs = Mockito.mock(MasterFileSystem.class);
Mockito.doNothing().when(fs).setLogRecoveryMode();
Mockito.when(fs.getLogRecoveryMode()).thenReturn(RecoveryMode.LOG_REPLAY);
MasterServices services = Mockito.mock(MasterServices.class);
Mockito.when(services.getAssignmentManager()).thenReturn(am);
Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
Mockito.when(services.getMasterFileSystem()).thenReturn(fs);
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
services, deadServers, SERVERNAME_A, false);
am.failoverCleanupDone.set(true);

View File

@ -94,8 +94,8 @@ public class TestMasterFileSystem {
// Create a ZKW to use in the test
ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, walPath),
new SplitLogTask.Owned(inRecoveryServerName).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
new SplitLogTask.Owned(inRecoveryServerName, fs.getLogRecoveryMode()).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, staleRegion);
ZKUtil.createWithParents(zkw, staleRegionPath);
String inRecoveringRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);

View File

@ -48,16 +48,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -90,6 +95,7 @@ public class TestSplitLogManager {
private SplitLogManager slm;
private Configuration conf;
private int to;
private RecoveryMode mode;
private static HBaseTestingUtility TEST_UTIL;
@ -134,6 +140,9 @@ public class TestSplitLogManager {
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
to = to + 4 * 100;
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@ -213,7 +222,7 @@ public class TestSplitLogManager {
LOG.info("TestOrphanTaskAcquisition");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER);
SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@ -238,7 +247,7 @@ public class TestSplitLogManager {
" startup");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
//create an unassigned orphan task
SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER);
SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode);
@ -274,19 +283,19 @@ public class TestSplitLogManager {
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
final ServerName worker2 = ServerName.valueOf("worker2,1,1");
final ServerName worker3 = ServerName.valueOf("worker3,1,1");
SplitLogTask slt = new SplitLogTask.Owned(worker1);
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
slt = new SplitLogTask.Owned(worker2);
slt = new SplitLogTask.Owned(worker2, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
int version2 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version2 > version1);
slt = new SplitLogTask.Owned(worker3);
slt = new SplitLogTask.Owned(worker3, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
@ -304,7 +313,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Owned(worker1);
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
waitForCounter(new Expr() {
@ -331,7 +340,7 @@ public class TestSplitLogManager {
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Done(worker1);
SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
synchronized (batch) {
while (batch.installed != batch.done) {
@ -352,7 +361,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Err(worker1);
SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
synchronized (batch) {
@ -376,7 +385,7 @@ public class TestSplitLogManager {
assertEquals(tot_mgr_resubmit.get(), 0);
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
assertEquals(tot_mgr_resubmit.get(), 0);
SplitLogTask slt = new SplitLogTask.Resigned(worker1);
SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
assertEquals(tot_mgr_resubmit.get(), 0);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
int version = ZKUtil.checkExists(zkw, tasknode);
@ -399,7 +408,7 @@ public class TestSplitLogManager {
// create an orphan task in OWNED state
String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Owned(worker1);
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@ -414,7 +423,7 @@ public class TestSplitLogManager {
for (int i = 0; i < (3 * to)/100; i++) {
Thread.sleep(100);
final ServerName worker2 = ServerName.valueOf("worker1,1,1");
slt = new SplitLogTask.Owned(worker2);
slt = new SplitLogTask.Owned(worker2, this.mode);
ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
}
@ -438,7 +447,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Owned(worker1);
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
slm.handleDeadWorker(worker1);
@ -463,7 +472,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Owned(worker1);
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
@ -513,4 +522,25 @@ public class TestSplitLogManager {
assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
}
@Test(timeout=60000)
public void testGetPreviousRecoveryMode() throws Exception {
LOG.info("testGetPreviousRecoveryMode");
SplitLogCounters.resetCounters();
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
new SplitLogTask.Unassigned(
ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
slm.setRecoveryMode(false);
assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
}
}

View File

@ -132,7 +132,7 @@ public class TestRegionServerNoMaster {
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
// first version is '0'
AdminProtos.OpenRegionRequest orr =
RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0).
@ -251,7 +251,8 @@ public class TestRegionServerNoMaster {
// We're sending multiple requests in a row. The region server must handle this nicely.
for (int i = 0; i < 10; i++) {
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
getRS().getServerName(), hri, 0, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
@ -277,7 +278,7 @@ public class TestRegionServerNoMaster {
// fake region to be closing now, need to clear state afterwards
getRS().regionsInTransitionInRS.put(hri.getEncodedNameAsBytes(), Boolean.FALSE);
AdminProtos.OpenRegionRequest orr =
RequestConverter.buildOpenRegionRequest(sn, hri, 0, null);
RequestConverter.buildOpenRegionRequest(sn, hri, 0, null, null);
getRS().rpcServices.openRegion(null, orr);
Assert.fail("The closing region should not be opened");
} catch (ServiceException se) {
@ -454,7 +455,8 @@ public class TestRegionServerNoMaster {
//actual close
closeNoZK();
try {
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(earlierServerName, hri, 0, null);
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
earlierServerName, hri, 0, null, null);
getRS().getRSRpcServices().openRegion(null, orr);
Assert.fail("The openRegion should have been rejected");
} catch (ServiceException se) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
@ -38,6 +40,8 @@ import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -64,6 +68,7 @@ public class TestSplitLogWorker {
private ZooKeeperWatcher zkw;
private SplitLogWorker slw;
private ExecutorService executorService;
private RecoveryMode mode;
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
throws Exception {
@ -98,6 +103,7 @@ public class TestSplitLogWorker {
@Before
public void setup() throws Exception {
TEST_UTIL.startMiniZKCluster();
Configuration conf = TEST_UTIL.getConfiguration();
zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"split-log-worker-tests", null);
ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
@ -112,6 +118,8 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters();
executorService = new ExecutorService("TestSplitLogWorker");
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@ -126,7 +134,7 @@ public class TestSplitLogWorker {
new SplitLogWorker.TaskExecutor() {
@Override
public Status exec(String name, CancelableProgressable p) {
public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
while (true) {
try {
Thread.sleep(1000);
@ -149,7 +157,8 @@ public class TestSplitLogWorker {
final ServerName RS = ServerName.valueOf("rs,1,1");
RegionServerServices mockedRS = getRegionServer(RS);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
SplitLogWorker slw =
@ -184,8 +193,8 @@ public class TestSplitLogWorker {
final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
RegionServerServices mockedRS1 = getRegionServer(SVR1);
RegionServerServices mockedRS2 = getRegionServer(SVR2);
SplitLogWorker slw1 =
@ -227,15 +236,15 @@ public class TestSplitLogWorker {
// this time create a task node after starting the splitLogWorker
zkw.getRecoverableZooKeeper().create(PATH,
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
assertEquals(1, slw.taskReadySeq);
byte [] bytes = ZKUtil.getData(zkw, PATH);
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SRV));
slt = new SplitLogTask.Owned(MANAGER);
slt = new SplitLogTask.Owned(MANAGER, this.mode);
ZKUtil.setData(zkw, PATH, slt.toByteArray());
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
} finally {
@ -258,7 +267,8 @@ public class TestSplitLogWorker {
Thread.sleep(100);
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
SplitLogTask unassignedManager =
new SplitLogTask.Unassigned(MANAGER, this.mode);
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@ -272,7 +282,7 @@ public class TestSplitLogWorker {
// preempt the first task, have it owned by another worker
final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
ZKUtil.setData(zkw, PATH1, slt.toByteArray());
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
@ -298,7 +308,7 @@ public class TestSplitLogWorker {
Thread.sleep(100);
String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@ -351,8 +361,8 @@ public class TestSplitLogWorker {
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
@ -394,9 +404,8 @@ public class TestSplitLogWorker {
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);

View File

@ -444,6 +444,7 @@ public class TestSplitTransactionOnCluster {
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
cluster.startRegionServer();
t.close();
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.util.Bytes;
@ -111,8 +112,10 @@ public class TestHLogMethods {
@Test
public void testEntrySink() throws Exception {
Configuration conf = new Configuration();
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
HLogSplitter splitter = new HLogSplitter(
conf, mock(Path.class), mock(FileSystem.class), null, null, null);
conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode);
EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
for (int i = 0; i < 1000; i++) {

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
@ -127,6 +128,7 @@ public class TestHLogSplit {
private static String ROBBER;
private static String ZOMBIE;
private static String [] GROUP = new String [] {"supergroup"};
private RecoveryMode mode;
static enum Corruptions {
INSERT_GARBAGE_ON_FIRST_LINE,
@ -177,6 +179,8 @@ public class TestHLogSplit {
REGIONS.clear();
Collections.addAll(REGIONS, "bbb", "ccc");
InstrumentedSequenceFileLogWriter.activateFailure = false;
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@ -805,7 +809,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
// Set up a splitter that will throw an IOE on the output side
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null, null) {
conf, HBASEDIR, fs, null, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs,
Path logfile, Configuration conf) throws IOException {
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@ -938,7 +942,7 @@ public class TestHLogSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = HLogSplitter.splitLogFile(
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null);
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@ -997,7 +1001,7 @@ public class TestHLogSplit {
// Create a splitter that reads and writes the data without touching disk
HLogSplitter logSplitter = new HLogSplitter(
localConf, HBASEDIR, fs, null, null, null) {
localConf, HBASEDIR, fs, null, null, null, this.mode) {
/* Produce a mock writer that doesn't write anywhere */
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@ -1282,7 +1286,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null, null) {
conf, HBASEDIR, fs, null, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
@ -98,6 +99,8 @@ public class TestWALReplay {
private Path logDir;
private FileSystem fs;
private Configuration conf;
private RecoveryMode mode;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -128,6 +131,8 @@ public class TestWALReplay {
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@ -873,7 +878,7 @@ public class TestWALReplay {
wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, null);
this.fs, this.conf, null, null, null, null, mode);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
new Path(hri.getEncodedName(), "recovered.edits")));