diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index aca8b867339..f875e7ed890 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1596,7 +1596,7 @@ public final class ProtobufUtil { public static void openRegion(final AdminService.BlockingInterface admin, ServerName server, final HRegionInfo region) throws IOException { OpenRegionRequest request = - RequestConverter.buildOpenRegionRequest(server, region, -1, null); + RequestConverter.buildOpenRegionRequest(server, region, -1, null, null); try { admin.openRegion(null, request); } catch (ServiceException se) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index a1b548d4a5b..381ddda925b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.List; import com.google.protobuf.HBaseZeroCopyByteString; + +import org.apache.commons.configuration.Configuration; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -702,17 +704,18 @@ public final class RequestConverter { * Create a protocol buffer OpenRegionRequest to open a list of regions * * @param regionOpenInfos info of a list of regions to open + * @param openForReplay * @return a protocol buffer OpenRegionRequest */ public static OpenRegionRequest buildOpenRegionRequest(final List>> regionOpenInfos) { + List>> regionOpenInfos, Boolean openForReplay) { OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); for (Triple> regionOpenInfo: regionOpenInfos) { Integer second = regionOpenInfo.getSecond(); int versionOfOfflineNode = second == null ? -1 : second.intValue(); - builder.addOpenInfo(buildRegionOpenInfo( - regionOpenInfo.getFirst(), versionOfOfflineNode, regionOpenInfo.getThird())); + builder.addOpenInfo(buildRegionOpenInfo(regionOpenInfo.getFirst(), versionOfOfflineNode, + regionOpenInfo.getThird(), openForReplay)); } return builder.build(); } @@ -724,12 +727,15 @@ public final class RequestConverter { * @param region the region to open * @param versionOfOfflineNode that needs to be present in the offline node * @param favoredNodes + * @param openForReplay * @return a protocol buffer OpenRegionRequest */ public static OpenRegionRequest buildOpenRegionRequest(ServerName server, - final HRegionInfo region, final int versionOfOfflineNode, List favoredNodes) { + final HRegionInfo region, final int versionOfOfflineNode, List favoredNodes, + Boolean openForReplay) { OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); - builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes)); + builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes, + openForReplay)); if (server != null) { builder.setServerStartCode(server.getStartcode()); } @@ -1493,7 +1499,7 @@ public final class RequestConverter { */ private static RegionOpenInfo buildRegionOpenInfo( final HRegionInfo region, final int versionOfOfflineNode, - final List favoredNodes) { + final List favoredNodes, Boolean openForReplay) { RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder(); builder.setRegion(HRegionInfo.convert(region)); if (versionOfOfflineNode >= 0) { @@ -1504,6 +1510,9 @@ public final class RequestConverter { builder.addFavoredNodes(ProtobufUtil.toServerName(server)); } } + if(openForReplay != null) { + builder.setOpenForDistributedLogReplay(openForReplay); + } return builder.build(); } } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index 0ad10ad7370..636e51f29f2 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -4032,6 +4032,24 @@ public final class AdminProtos { */ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getFavoredNodesOrBuilder( int index); + + // optional bool openForDistributedLogReplay = 4; + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+       * open region for distributedLogReplay
+       * 
+ */ + boolean hasOpenForDistributedLogReplay(); + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+       * open region for distributedLogReplay
+       * 
+ */ + boolean getOpenForDistributedLogReplay(); } /** * Protobuf type {@code OpenRegionRequest.RegionOpenInfo} @@ -4110,6 +4128,11 @@ public final class AdminProtos { favoredNodes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.PARSER, extensionRegistry)); break; } + case 32: { + bitField0_ |= 0x00000004; + openForDistributedLogReplay_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4227,10 +4250,35 @@ public final class AdminProtos { return favoredNodes_.get(index); } + // optional bool openForDistributedLogReplay = 4; + public static final int OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER = 4; + private boolean openForDistributedLogReplay_; + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+       * open region for distributedLogReplay
+       * 
+ */ + public boolean hasOpenForDistributedLogReplay() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+       * open region for distributedLogReplay
+       * 
+ */ + public boolean getOpenForDistributedLogReplay() { + return openForDistributedLogReplay_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.getDefaultInstance(); versionOfOfflineNode_ = 0; favoredNodes_ = java.util.Collections.emptyList(); + openForDistributedLogReplay_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4267,6 +4315,9 @@ public final class AdminProtos { for (int i = 0; i < favoredNodes_.size(); i++) { output.writeMessage(3, favoredNodes_.get(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBool(4, openForDistributedLogReplay_); + } getUnknownFields().writeTo(output); } @@ -4288,6 +4339,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(3, favoredNodes_.get(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, openForDistributedLogReplay_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4323,6 +4378,11 @@ public final class AdminProtos { } result = result && getFavoredNodesList() .equals(other.getFavoredNodesList()); + result = result && (hasOpenForDistributedLogReplay() == other.hasOpenForDistributedLogReplay()); + if (hasOpenForDistributedLogReplay()) { + result = result && (getOpenForDistributedLogReplay() + == other.getOpenForDistributedLogReplay()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4348,6 +4408,10 @@ public final class AdminProtos { hash = (37 * hash) + FAVORED_NODES_FIELD_NUMBER; hash = (53 * hash) + getFavoredNodesList().hashCode(); } + if (hasOpenForDistributedLogReplay()) { + hash = (37 * hash) + OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getOpenForDistributedLogReplay()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4473,6 +4537,8 @@ public final class AdminProtos { } else { favoredNodesBuilder_.clear(); } + openForDistributedLogReplay_ = false; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -4522,6 +4588,10 @@ public final class AdminProtos { } else { result.favoredNodes_ = favoredNodesBuilder_.build(); } + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000004; + } + result.openForDistributedLogReplay_ = openForDistributedLogReplay_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4570,6 +4640,9 @@ public final class AdminProtos { } } } + if (other.hasOpenForDistributedLogReplay()) { + setOpenForDistributedLogReplay(other.getOpenForDistributedLogReplay()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -5001,6 +5074,55 @@ public final class AdminProtos { return favoredNodesBuilder_; } + // optional bool openForDistributedLogReplay = 4; + private boolean openForDistributedLogReplay_ ; + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+         * open region for distributedLogReplay
+         * 
+ */ + public boolean hasOpenForDistributedLogReplay() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+         * open region for distributedLogReplay
+         * 
+ */ + public boolean getOpenForDistributedLogReplay() { + return openForDistributedLogReplay_; + } + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+         * open region for distributedLogReplay
+         * 
+ */ + public Builder setOpenForDistributedLogReplay(boolean value) { + bitField0_ |= 0x00000008; + openForDistributedLogReplay_ = value; + onChanged(); + return this; + } + /** + * optional bool openForDistributedLogReplay = 4; + * + *
+         * open region for distributedLogReplay
+         * 
+ */ + public Builder clearOpenForDistributedLogReplay() { + bitField0_ = (bitField0_ & ~0x00000008); + openForDistributedLogReplay_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:OpenRegionRequest.RegionOpenInfo) } @@ -21166,77 +21288,78 @@ public final class AdminProtos { "FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" + "nlineRegionRequest\";\n\027GetOnlineRegionRes" + "ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" + - "\326\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" + + "\374\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" + "2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" + - "erverStartCode\030\002 \001(\004\032r\n\016RegionOpenInfo\022\033" + - "\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_o" + - "f_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 " + - "\003(\0132\013.ServerName\"\235\001\n\022OpenRegionResponse\022", - "=\n\ropening_state\030\001 \003(\0162&.OpenRegionRespo" + - "nse.RegionOpeningState\"H\n\022RegionOpeningS" + - "tate\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016" + - "FAILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022" + - " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027ver" + - "sion_of_closing_node\030\002 \001(\r\022\036\n\020transition" + - "_in_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server" + - "\030\004 \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005" + - " \001(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 " + - "\002(\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(", - "\0132\020.RegionSpecifier\022\030\n\020if_older_than_ts\030" + - "\002 \001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flu" + - "sh_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitR" + - "egionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpe" + - "cifier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegi" + - "onResponse\"W\n\024CompactRegionRequest\022 \n\006re" + - "gion\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 " + - "\001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionResp" + - "onse\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013up" + - "date_info\030\001 \003(\0132+.UpdateFavoredNodesRequ", - "est.RegionUpdateInfo\032S\n\020RegionUpdateInfo" + - "\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored" + - "_nodes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavor" + - "edNodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Mer" + - "geRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Reg" + - "ionSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionS" + - "pecifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Mer" + - "geRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002" + - "(\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025" + - "associated_cell_count\030\003 \001(\005\"4\n\030Replicate", - "WALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntr" + - "y\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWA" + - "LWriterRequest\"0\n\025RollWALWriterResponse\022" + - "\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRe" + - "quest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespo" + - "nse\"\026\n\024GetServerInfoRequest\"B\n\nServerInf" + - "o\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nw" + - "ebui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse" + - "\022 \n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014A" + - "dminService\022>\n\rGetRegionInfo\022\025.GetRegion", - "InfoRequest\032\026.GetRegionInfoResponse\022;\n\014G" + - "etStoreFile\022\024.GetStoreFileRequest\032\025.GetS" + - "toreFileResponse\022D\n\017GetOnlineRegion\022\027.Ge" + - "tOnlineRegionRequest\032\030.GetOnlineRegionRe" + - "sponse\0225\n\nOpenRegion\022\022.OpenRegionRequest" + - "\032\023.OpenRegionResponse\0228\n\013CloseRegion\022\023.C" + - "loseRegionRequest\032\024.CloseRegionResponse\022" + - "8\n\013FlushRegion\022\023.FlushRegionRequest\032\024.Fl" + - "ushRegionResponse\0228\n\013SplitRegion\022\023.Split" + - "RegionRequest\032\024.SplitRegionResponse\022>\n\rC", - "ompactRegion\022\025.CompactRegionRequest\032\026.Co" + - "mpactRegionResponse\022;\n\014MergeRegions\022\024.Me" + - "rgeRegionsRequest\032\025.MergeRegionsResponse" + - "\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEntr" + - "yRequest\032\032.ReplicateWALEntryResponse\022?\n\006" + - "Replay\022\031.ReplicateWALEntryRequest\032\032.Repl" + - "icateWALEntryResponse\022>\n\rRollWALWriter\022\025" + - ".RollWALWriterRequest\032\026.RollWALWriterRes" + - "ponse\022>\n\rGetServerInfo\022\025.GetServerInfoRe" + - "quest\032\026.GetServerInfoResponse\0225\n\nStopSer", - "ver\022\022.StopServerRequest\032\023.StopServerResp" + - "onse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavor" + - "edNodesRequest\032\033.UpdateFavoredNodesRespo" + - "nseBA\n*org.apache.hadoop.hbase.protobuf." + - "generatedB\013AdminProtosH\001\210\001\001\240\001\001" + "erverStartCode\030\002 \001(\004\032\227\001\n\016RegionOpenInfo\022" + + "\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_" + + "of_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003" + + " \003(\0132\013.ServerName\022#\n\033openForDistributedL", + "ogReplay\030\004 \001(\010\"\235\001\n\022OpenRegionResponse\022=\n" + + "\ropening_state\030\001 \003(\0162&.OpenRegionRespons" + + "e.RegionOpeningState\"H\n\022RegionOpeningSta" + + "te\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FA" + + "ILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022 \n" + + "\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027versi" + + "on_of_closing_node\030\002 \001(\r\022\036\n\020transition_i" + + "n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004" + + " \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" + + "(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(", + "\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" + + "\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " + + "\001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flush" + + "_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitReg" + + "ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" + + "fier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegion" + + "Response\"W\n\024CompactRegionRequest\022 \n\006regi" + + "on\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 \001(" + + "\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionRespon" + + "se\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013upda", + "te_info\030\001 \003(\0132+.UpdateFavoredNodesReques" + + "t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" + + "\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" + + "odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" + + "NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" + + "RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" + + "nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" + + "cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" + + "RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" + + "2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as", + "sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" + + "LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" + + "\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" + + "riterRequest\"0\n\025RollWALWriterResponse\022\027\n" + + "\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" + + "est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" + + "e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" + + " \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" + + "ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " + + "\n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014Adm", + "inService\022>\n\rGetRegionInfo\022\025.GetRegionIn" + + "foRequest\032\026.GetRegionInfoResponse\022;\n\014Get" + + "StoreFile\022\024.GetStoreFileRequest\032\025.GetSto" + + "reFileResponse\022D\n\017GetOnlineRegion\022\027.GetO" + + "nlineRegionRequest\032\030.GetOnlineRegionResp" + + "onse\0225\n\nOpenRegion\022\022.OpenRegionRequest\032\023" + + ".OpenRegionResponse\0228\n\013CloseRegion\022\023.Clo" + + "seRegionRequest\032\024.CloseRegionResponse\0228\n" + + "\013FlushRegion\022\023.FlushRegionRequest\032\024.Flus" + + "hRegionResponse\0228\n\013SplitRegion\022\023.SplitRe", + "gionRequest\032\024.SplitRegionResponse\022>\n\rCom" + + "pactRegion\022\025.CompactRegionRequest\032\026.Comp" + + "actRegionResponse\022;\n\014MergeRegions\022\024.Merg" + + "eRegionsRequest\032\025.MergeRegionsResponse\022J" + + "\n\021ReplicateWALEntry\022\031.ReplicateWALEntryR" + + "equest\032\032.ReplicateWALEntryResponse\022?\n\006Re" + + "play\022\031.ReplicateWALEntryRequest\032\032.Replic" + + "ateWALEntryResponse\022>\n\rRollWALWriter\022\025.R" + + "ollWALWriterRequest\032\026.RollWALWriterRespo" + + "nse\022>\n\rGetServerInfo\022\025.GetServerInfoRequ", + "est\032\026.GetServerInfoResponse\0225\n\nStopServe" + + "r\022\022.StopServerRequest\032\023.StopServerRespon" + + "se\022M\n\022UpdateFavoredNodes\022\032.UpdateFavored" + + "NodesRequest\032\033.UpdateFavoredNodesRespons" + + "eBA\n*org.apache.hadoop.hbase.protobuf.ge" + + "neratedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -21290,7 +21413,7 @@ public final class AdminProtos { internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_OpenRegionRequest_RegionOpenInfo_descriptor, - new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", }); + new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", "OpenForDistributedLogReplay", }); internal_static_OpenRegionResponse_descriptor = getDescriptor().getMessageTypes().get(7); internal_static_OpenRegionResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java index 0af2a976ff6..9d037f5932e 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java @@ -3230,6 +3230,16 @@ public final class ZooKeeperProtos { * required .ServerName server_name = 2; */ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder(); + + // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + boolean hasMode(); + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode(); } /** * Protobuf type {@code SplitLogTask} @@ -3312,6 +3322,17 @@ public final class ZooKeeperProtos { bitField0_ |= 0x00000002; break; } + case 24: { + int rawValue = input.readEnum(); + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(3, rawValue); + } else { + bitField0_ |= 0x00000004; + mode_ = value; + } + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -3460,6 +3481,97 @@ public final class ZooKeeperProtos { // @@protoc_insertion_point(enum_scope:SplitLogTask.State) } + /** + * Protobuf enum {@code SplitLogTask.RecoveryMode} + */ + public enum RecoveryMode + implements com.google.protobuf.ProtocolMessageEnum { + /** + * UNKNOWN = 0; + */ + UNKNOWN(0, 0), + /** + * LOG_SPLITTING = 1; + */ + LOG_SPLITTING(1, 1), + /** + * LOG_REPLAY = 2; + */ + LOG_REPLAY(2, 2), + ; + + /** + * UNKNOWN = 0; + */ + public static final int UNKNOWN_VALUE = 0; + /** + * LOG_SPLITTING = 1; + */ + public static final int LOG_SPLITTING_VALUE = 1; + /** + * LOG_REPLAY = 2; + */ + public static final int LOG_REPLAY_VALUE = 2; + + + public final int getNumber() { return value; } + + public static RecoveryMode valueOf(int value) { + switch (value) { + case 0: return UNKNOWN; + case 1: return LOG_SPLITTING; + case 2: return LOG_REPLAY; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public RecoveryMode findValueByNumber(int number) { + return RecoveryMode.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor().getEnumTypes().get(1); + } + + private static final RecoveryMode[] VALUES = values(); + + public static RecoveryMode valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private RecoveryMode(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:SplitLogTask.RecoveryMode) + } + private int bitField0_; // required .SplitLogTask.State state = 1; public static final int STATE_FIELD_NUMBER = 1; @@ -3499,9 +3611,26 @@ public final class ZooKeeperProtos { return serverName_; } + // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + public static final int MODE_FIELD_NUMBER = 3; + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_; + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + public boolean hasMode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() { + return mode_; + } + private void initFields() { state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED; serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -3533,6 +3662,9 @@ public final class ZooKeeperProtos { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeMessage(2, serverName_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeEnum(3, mode_.getNumber()); + } getUnknownFields().writeTo(output); } @@ -3550,6 +3682,10 @@ public final class ZooKeeperProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(2, serverName_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(3, mode_.getNumber()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3583,6 +3719,11 @@ public final class ZooKeeperProtos { result = result && getServerName() .equals(other.getServerName()); } + result = result && (hasMode() == other.hasMode()); + if (hasMode()) { + result = result && + (getMode() == other.getMode()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -3604,6 +3745,10 @@ public final class ZooKeeperProtos { hash = (37 * hash) + SERVER_NAME_FIELD_NUMBER; hash = (53 * hash) + getServerName().hashCode(); } + if (hasMode()) { + hash = (37 * hash) + MODE_FIELD_NUMBER; + hash = (53 * hash) + hashEnum(getMode()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -3728,6 +3873,8 @@ public final class ZooKeeperProtos { serverNameBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000002); + mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -3768,6 +3915,10 @@ public final class ZooKeeperProtos { } else { result.serverName_ = serverNameBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.mode_ = mode_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3790,6 +3941,9 @@ public final class ZooKeeperProtos { if (other.hasServerName()) { mergeServerName(other.getServerName()); } + if (other.hasMode()) { + setMode(other.getMode()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3982,6 +4136,42 @@ public final class ZooKeeperProtos { return serverNameBuilder_; } + // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN; + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + public boolean hasMode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() { + return mode_; + } + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + public Builder setMode(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + mode_ = value; + onChanged(); + return this; + } + /** + * optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN]; + */ + public Builder clearMode() { + bitField0_ = (bitField0_ & ~0x00000004); + mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:SplitLogTask) } @@ -9399,29 +9589,32 @@ public final class ZooKeeperProtos { "gionTransition\022\027\n\017event_type_code\030\001 \002(\r\022" + "\023\n\013region_name\030\002 \002(\014\022\023\n\013create_time\030\003 \002(" + "\004\022 \n\013server_name\030\004 \002(\0132\013.ServerName\022\017\n\007p" + - "ayload\030\005 \001(\014\"\231\001\n\014SplitLogTask\022\"\n\005state\030\001" + + "ayload\030\005 \001(\014\"\214\002\n\014SplitLogTask\022\"\n\005state\030\001" + " \002(\0162\023.SplitLogTask.State\022 \n\013server_name", - "\030\002 \002(\0132\013.ServerName\"C\n\005State\022\016\n\nUNASSIGN" + - "ED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022" + - "\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table" + - ".State:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n" + - "\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003" + - "\"%\n\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"" + - "^\n\020ReplicationState\022&\n\005state\030\001 \002(\0162\027.Rep" + - "licationState.State\"\"\n\005State\022\013\n\007ENABLED\020" + - "\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPositi" + - "on\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationLock\022", - "\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntab" + - "le_name\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030" + - "\002 \001(\0132\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n" + - "\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013cre" + - "ate_time\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013fam" + - "ily_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026R" + - "egionStoreSequenceIds\022 \n\030last_flushed_se" + - "quence_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003" + - "(\0132\020.StoreSequenceIdBE\n*org.apache.hadoo" + - "p.hbase.protobuf.generatedB\017ZooKeeperPro", - "tosH\001\210\001\001\240\001\001" + "\030\002 \002(\0132\013.ServerName\0221\n\004mode\030\003 \001(\0162\032.Spli" + + "tLogTask.RecoveryMode:\007UNKNOWN\"C\n\005State\022" + + "\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002" + + "\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\">\n\014RecoveryMode\022\013\n\007U" + + "NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" + + "Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" + + "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" + + "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017R" + + "eplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Re" + + "plicationState\022&\n\005state\030\001 \002(\0162\027.Replicat", + "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" + + "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" + + "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo" + + "ck_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_na" + + "me\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002 \001(\013" + + "2\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_s" + + "hared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_t" + + "ime\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013family_n" + + "ame\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Region" + + "StoreSequenceIds\022 \n\030last_flushed_sequenc", + "e_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020." + + "StoreSequenceIdBE\n*org.apache.hadoop.hba" + + "se.protobuf.generatedB\017ZooKeeperProtosH\001" + + "\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -9457,7 +9650,7 @@ public final class ZooKeeperProtos { internal_static_SplitLogTask_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SplitLogTask_descriptor, - new java.lang.String[] { "State", "ServerName", }); + new java.lang.String[] { "State", "ServerName", "Mode", }); internal_static_Table_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_Table_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 5b889cd10b5..ecf30f4209d 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -75,6 +75,8 @@ message OpenRegionRequest { required RegionInfo region = 1; optional uint32 version_of_offline_node = 2; repeated ServerName favored_nodes = 3; + // open region for distributedLogReplay + optional bool openForDistributedLogReplay = 4; } } diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto index 082e1f75b73..37816daac46 100644 --- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto +++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto @@ -85,8 +85,14 @@ message SplitLogTask { DONE = 3; ERR = 4; } + enum RecoveryMode { + UNKNOWN = 0; + LOG_SPLITTING = 1; + LOG_REPLAY = 2; + } required State state = 1; required ServerName server_name = 2; + optional RecoveryMode mode = 3 [default = UNKNOWN]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java index 67a099446d2..1b5f8b7ad16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.InvalidProtocolBufferException; @@ -36,49 +39,59 @@ import com.google.protobuf.InvalidProtocolBufferException; public class SplitLogTask { private final ServerName originServer; private final ZooKeeperProtos.SplitLogTask.State state; + private final ZooKeeperProtos.SplitLogTask.RecoveryMode mode; public static class Unassigned extends SplitLogTask { - public Unassigned(final ServerName originServer) { - super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED); + public Unassigned(final ServerName originServer, final RecoveryMode mode) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED, mode); } } public static class Owned extends SplitLogTask { - public Owned(final ServerName originServer) { - super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED); + public Owned(final ServerName originServer, final RecoveryMode mode) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED, mode); } } public static class Resigned extends SplitLogTask { - public Resigned(final ServerName originServer) { - super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED); + public Resigned(final ServerName originServer, final RecoveryMode mode) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED, mode); } } public static class Done extends SplitLogTask { - public Done(final ServerName originServer) { - super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE); + public Done(final ServerName originServer, final RecoveryMode mode) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE, mode); } } public static class Err extends SplitLogTask { - public Err(final ServerName originServer) { - super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR); + public Err(final ServerName originServer, final RecoveryMode mode) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR, mode); } } SplitLogTask(final ZooKeeperProtos.SplitLogTask slt) { - this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState()); + this.originServer = ProtobufUtil.toServerName(slt.getServerName()); + this.state = slt.getState(); + this.mode = (slt.hasMode()) ? slt.getMode() : + ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN; } - SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) { + SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state, + final ZooKeeperProtos.SplitLogTask.RecoveryMode mode) { this.originServer = originServer; this.state = state; + this.mode = mode; } public ServerName getServerName() { return this.originServer; } + + public ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() { + return this.mode; + } public boolean isUnassigned(final ServerName sn) { return this.originServer.equals(sn) && isUnassigned(); @@ -167,7 +180,8 @@ public class SplitLogTask { // pbs just created. HBaseProtos.ServerName snpb = ProtobufUtil.toServerName(this.originServer); ZooKeeperProtos.SplitLogTask slts = - ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).build(); + ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state). + setMode(this.mode).build(); return ProtobufUtil.prependPBMagic(slts.toByteArray()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 6a5c77e626d..e444a94239d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; @@ -711,7 +712,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { enableMeta(TableName.META_TABLE_NAME); - if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) { + if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) + && (!previouslyFailedMetaRSs.isEmpty())) { // replay WAL edits mode need new hbase:meta RS is assigned firstly status.setStatus("replaying log for Meta Region"); this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs); @@ -740,7 +742,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException { - if (this.distributedLogReplay) { + if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) { // In log replay mode, we mark hbase:meta region as recovering in ZK Set regions = new HashSet(); regions.add(HRegionInfo.FIRST_META_REGIONINFO); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 59e93ad9939..153ffcbf479 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; @@ -122,14 +123,18 @@ public class MasterFileSystem { FSUtils.setFsDefault(conf, new Path(this.fs.getUri())); // make sure the fs has the same conf fs.setConf(conf); - this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); // setup the filesystem variable // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); HFileSystem.addLocationsOrderInterceptor(conf); - this.splitLogManager = new SplitLogManager(master.getZooKeeper(), - master.getConfiguration(), master, services, - master.getServerName()); + try { + this.splitLogManager = new SplitLogManager(master.getZooKeeper(), + master.getConfiguration(), master, services, + master.getServerName()); + } catch (KeeperException e) { + throw new IOException(e); + } + this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY); } /** @@ -682,4 +687,22 @@ public class MasterFileSystem { } return null; } + + /** + * The function is used in SSH to set recovery mode based on configuration after all outstanding + * log split tasks drained. + * @throws KeeperException + * @throws InterruptedIOException + */ + public void setLogRecoveryMode() throws IOException { + try { + this.splitLogManager.setRecoveryMode(false); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + public RecoveryMode getLogRecoveryMode() { + return this.splitLogManager.getRecoveryMode(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index e209162b5de..31fe2a0bf7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -60,8 +60,10 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -728,8 +730,9 @@ public class ServerManager { " failed because no RPC connection found to this server"); return RegionOpeningState.FAILED_OPENING; } - OpenRegionRequest request = - RequestConverter.buildOpenRegionRequest(server, region, versionOfOfflineNode, favoredNodes); + OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, + region, versionOfOfflineNode, favoredNodes, + (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode())); try { OpenRegionResponse response = admin.openRegion(null, request); return ResponseConverter.getRegionOpeningState(response); @@ -757,8 +760,8 @@ public class ServerManager { return null; } - OpenRegionRequest request = - RequestConverter.buildOpenRegionRequest(regionOpenInfos); + OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(regionOpenInfos, + (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode())); try { OpenRegionResponse response = admin.openRegion(null, request); return ResponseConverter.getRegionOpeningStateList(response); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 4e3bb00596b..b9414cd5db2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -46,16 +46,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; @@ -138,7 +141,8 @@ public class SplitLogManager extends ZooKeeperListener { */ protected final ReentrantLock recoveringRegionLock = new ReentrantLock(); - final boolean distributedLogReplay; + private volatile RecoveryMode recoveryMode; + private volatile boolean isDrainingDone = false; private final ConcurrentMap tasks = new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; @@ -160,9 +164,12 @@ public class SplitLogManager extends ZooKeeperListener { * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name + * @throws KeeperException + * @throws InterruptedIOException */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, MasterServices master, ServerName serverName) { + Stoppable stopper, MasterServices master, ServerName serverName) + throws InterruptedIOException, KeeperException { this(zkw, conf, stopper, master, serverName, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { @@ -178,20 +185,20 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * Its OK to construct this object even when region-servers are not online. It - * does lookup the orphan tasks in zk but it doesn't block waiting for them - * to be done. - * + * Its OK to construct this object even when region-servers are not online. It does lookup the + * orphan tasks in zk but it doesn't block waiting for them to be done. * @param zkw the ZK watcher * @param conf the HBase configuration * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name * @param tf task finisher + * @throws KeeperException + * @throws InterruptedIOException */ - public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, - Stoppable stopper, MasterServices master, - ServerName serverName, TaskFinisher tf) { + public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, + MasterServices master, ServerName serverName, TaskFinisher tf) throws InterruptedIOException, + KeeperException { super(zkw); this.taskFinisher = tf; this.conf = conf; @@ -202,9 +209,12 @@ public class SplitLogManager extends ZooKeeperListener { this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); + + // Determine recovery mode + setRecoveryMode(true); + LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + - ", distributedLogReplay=" + this.distributedLogReplay); + ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY)); this.serverName = serverName; this.timeoutMonitor = new TimeoutMonitor( @@ -465,8 +475,7 @@ public class SplitLogManager extends ZooKeeperListener { */ private void removeRecoveringRegionsFromZK(final Set serverNames, Boolean isMetaRecovery) { - - if (!this.distributedLogReplay) { + if (this.recoveryMode != RecoveryMode.LOG_REPLAY) { // the function is only used in WALEdit direct replay mode return; } @@ -494,7 +503,7 @@ public class SplitLogManager extends ZooKeeperListener { if (count == 0 && this.master.isInitialized() && !this.master.getServerManager().areDeadServersInProgress()) { // no splitting work items left - deleteRecoveringRegionZNodes(null); + deleteRecoveringRegionZNodes(watcher, null); // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at // this point. lastRecoveringNodeCreationTime = Long.MAX_VALUE; @@ -550,14 +559,6 @@ public class SplitLogManager extends ZooKeeperListener { void removeStaleRecoveringRegionsFromZK(final Set failedServers) throws KeeperException, InterruptedIOException { - if (!this.distributedLogReplay) { - // remove any regions in recovery from ZK which could happen when we turn the feature on - // and later turn it off - ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode); - // the function is only used in distributedLogReplay mode when master is in initialization - return; - } - Set knownFailedServers = new HashSet(); if (failedServers != null) { for (ServerName tmpServerName : failedServers) { @@ -625,7 +626,7 @@ public class SplitLogManager extends ZooKeeperListener { } } - private void deleteRecoveringRegionZNodes(List regions) { + public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List regions) { try { if (regions == null) { // remove all children under /home/recovering-regions @@ -683,7 +684,7 @@ public class SplitLogManager extends ZooKeeperListener { } private void createNode(String path, Long retry_count) { - SplitLogTask slt = new SplitLogTask.Unassigned(serverName); + SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode); ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); return; @@ -858,7 +859,7 @@ public class SplitLogManager extends ZooKeeperListener { task.incarnation++; try { // blocking zk call but this is done from the timeout thread - SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName); + SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode); if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { LOG.debug("failed to resubmit task " + path + " version changed"); @@ -951,7 +952,7 @@ public class SplitLogManager extends ZooKeeperListener { // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); - SplitLogTask slt = new SplitLogTask.Done(this.serverName); + SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode); this.watcher.getRecoverableZooKeeper().getZooKeeper(). create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, @@ -1099,7 +1100,7 @@ public class SplitLogManager extends ZooKeeperListener { */ void markRegionsRecoveringInZK(final ServerName serverName, Set userRegions) throws KeeperException, InterruptedIOException { - if (userRegions == null || !this.distributedLogReplay) { + if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) { return; } @@ -1242,6 +1243,111 @@ public class SplitLogManager extends ZooKeeperListener { } return result; } + + /** + * This function is to set recovery mode from outstanding split log tasks from before or + * current configuration setting + * @param isForInitialization + * @throws KeeperException + * @throws InterruptedIOException + */ + public void setRecoveryMode(boolean isForInitialization) throws KeeperException, + InterruptedIOException { + if(this.isDrainingDone) { + // when there is no outstanding splitlogtask after master start up, we already have up to date + // recovery mode + return; + } + if(this.watcher == null) { + // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING + this.isDrainingDone = true; + this.recoveryMode = RecoveryMode.LOG_SPLITTING; + return; + } + boolean hasSplitLogTask = false; + boolean hasRecoveringRegions = false; + RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN; + RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING; + + // Firstly check if there are outstanding recovering regions + List regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); + if (regions != null && !regions.isEmpty()) { + hasRecoveringRegions = true; + previousRecoveryMode = RecoveryMode.LOG_REPLAY; + } + if (previousRecoveryMode == RecoveryMode.UNKNOWN) { + // Secondly check if there are outstanding split log task + List tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); + if (tasks != null && !tasks.isEmpty()) { + hasSplitLogTask = true; + if (isForInitialization) { + // during initialization, try to get recovery mode from splitlogtask + for (String task : tasks) { + try { + byte[] data = ZKUtil.getData(this.watcher, + ZKUtil.joinZNode(watcher.splitLogZNode, task)); + if (data == null) continue; + SplitLogTask slt = SplitLogTask.parseFrom(data); + previousRecoveryMode = slt.getMode(); + if (previousRecoveryMode == RecoveryMode.UNKNOWN) { + // created by old code base where we don't set recovery mode in splitlogtask + // we can safely set to LOG_SPLITTING because we're in master initialization code + // before SSH is enabled & there is no outstanding recovering regions + previousRecoveryMode = RecoveryMode.LOG_SPLITTING; + } + break; + } catch (DeserializationException e) { + LOG.warn("Failed parse data for znode " + task, e); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + } + } + } + + synchronized(this) { + if(this.isDrainingDone) { + return; + } + if (!hasSplitLogTask && !hasRecoveringRegions) { + this.isDrainingDone = true; + this.recoveryMode = recoveryModeInConfig; + return; + } else if (!isForInitialization) { + // splitlogtask hasn't drained yet, keep existing recovery mode + return; + } + + if (previousRecoveryMode != RecoveryMode.UNKNOWN) { + this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig); + this.recoveryMode = previousRecoveryMode; + } else { + this.recoveryMode = recoveryModeInConfig; + } + } + } + + public RecoveryMode getRecoveryMode() { + return this.recoveryMode; + } + + /** + * Returns if distributed log replay is turned on or not + * @param conf + * @return true when distributed log replay is turned on + */ + private boolean isDistributedLogReplay(Configuration conf) { + boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, + HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); + int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); + if (LOG.isDebugEnabled()) { + LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version); + } + // For distributed log replay, hfile version must be 3 at least; we need tag support. + return dlr && (version >= 3); + } /** * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed(). diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java index b7c14095995..11a4fb2d30b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.DeadServer; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.KeeperException; @@ -62,10 +63,13 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler { boolean gotException = true; try { AssignmentManager am = this.services.getAssignmentManager(); + this.services.getMasterFileSystem().setLogRecoveryMode(); + boolean distributedLogReplay = + (this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY); try { if (this.shouldSplitHlog) { LOG.info("Splitting hbase:meta logs for " + serverName); - if (this.distributedLogReplay) { + if (distributedLogReplay) { Set regions = new HashSet(); regions.add(HRegionInfo.FIRST_META_REGIONINFO); this.services.getMasterFileSystem().prepareLogReplay(serverName, regions); @@ -97,7 +101,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler { } try { - if (this.shouldSplitHlog && this.distributedLogReplay) { + if (this.shouldSplitHlog && distributedLogReplay) { if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO, regionAssignmentWaitTimeout)) { // Wait here is to avoid log replay hits current dead server and incur a RPC timeout diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index fc391af11a0..fcbe4f28e96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.zookeeper.KeeperException; @@ -64,7 +65,6 @@ public class ServerShutdownHandler extends EventHandler { protected final MasterServices services; protected final DeadServer deadServers; protected final boolean shouldSplitHlog; // whether to split HLog or not - protected final boolean distributedLogReplay; protected final int regionAssignmentWaitTimeout; public ServerShutdownHandler(final Server server, final MasterServices services, @@ -86,7 +86,6 @@ public class ServerShutdownHandler extends EventHandler { LOG.warn(this.serverName + " is NOT in deadservers; it should be!"); } this.shouldSplitHlog = shouldSplitHlog; - this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(server.getConfiguration()); this.regionAssignmentWaitTimeout = server.getConfiguration().getInt( HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000); } @@ -183,10 +182,16 @@ public class ServerShutdownHandler extends EventHandler { throw new IOException("Server is stopped"); } + // delayed to set recovery mode based on configuration only after all outstanding splitlogtask + // drained + this.services.getMasterFileSystem().setLogRecoveryMode(); + boolean distributedLogReplay = + (this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY); + try { if (this.shouldSplitHlog) { LOG.info("Splitting logs for " + serverName + " before assignment."); - if (this.distributedLogReplay) { + if (distributedLogReplay) { LOG.info("Mark regions in recovery before assignment."); Set serverNames = new HashSet(); serverNames.add(serverName); @@ -286,7 +291,7 @@ public class ServerShutdownHandler extends EventHandler { throw (InterruptedIOException)new InterruptedIOException().initCause(ie); } - if (this.shouldSplitHlog && this.distributedLogReplay) { + if (this.shouldSplitHlog && distributedLogReplay) { // wait for region assignment completes for (HRegionInfo hri : toAssignRegions) { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index fdf2cd39222..ea6ee53dabb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -364,9 +364,6 @@ public class HRegionServer extends HasThread implements private RegionServerProcedureManagerHost rspmHost; - // configuration setting on if replay WAL edits directly to another RS - protected final boolean distributedLogReplay; - // Table level lock manager for locking for region operations protected TableLockManager tableLockManager; @@ -447,7 +444,6 @@ public class HRegionServer extends HasThread implements this.startcode = System.currentTimeMillis(); String hostName = rpcServices.isa.getHostName(); serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode); - this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); // login the zookeeper client principal (if using security) ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file", @@ -631,9 +627,7 @@ public class HRegionServer extends HasThread implements this.abort("Failed to reach zk cluster when creating procedure handler.", e); } // register watcher for recovering regions - if(this.distributedLogReplay) { - this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this); - } + this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index ddf655f0022..efbfde1854d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1182,6 +1182,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, + regionServer.serverName)); } } + OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); final int regionCount = request.getOpenInfoCount(); final Map htds = @@ -1258,10 +1259,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (previous == null) { // check if the region to be opened is marked in recovering state in ZK - if (regionServer.distributedLogReplay - && SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), - region.getEncodedName())) { - regionServer.recoveringRegions.put(region.getEncodedName(), null); + if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), + region.getEncodedName())) { + // check if current region open is for distributedLogReplay. This check is to support + // rolling restart/upgrade where we want to Master/RS see same configuration + if (regionOpenInfo.hasOpenForDistributedLogReplay() + && regionOpenInfo.getOpenForDistributedLogReplay()) { + regionServer.recoveringRegions.put(region.getEncodedName(), null); + } else { + // remove stale recovery region from ZK when we open region not for recovering which + // could happen when turn distributedLogReplay off from on. + List tmpRegions = new ArrayList(); + tmpRegions.add(region.getEncodedName()); + SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions); + } } // If there is no action in progress, we can submit a specific handler. // Need to pass the expected version in the constructor. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 1f4ac07124a..6ade09907b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; @@ -44,6 +45,8 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; @@ -125,7 +128,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { final RegionServerServices server, final LastSequenceId sequenceIdChecker) { this(watcher, conf, server, new TaskExecutor() { @Override - public Status exec(String filename, CancelableProgressable p) { + public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) { Path rootdir; FileSystem fs; try { @@ -140,7 +143,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { // encountered a bad non-retry-able persistent error. try { if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), - fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager())) { + fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { @@ -174,11 +177,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { try { LOG.info("SplitLogWorker " + this.serverName + " starting"); this.watcher.registerListener(this); - boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); - if (distributedLogReplay) { - // initialize a new connection for splitlogworker configuration - HConnectionManager.getConnection(conf); - } + // pre-initialize a new connection for splitlogworker configuration + HConnectionManager.getConnection(conf); // wait for master to create the splitLogZnode int res = -1; @@ -301,7 +301,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { */ private void grabTask(String path) { Stat stat = new Stat(); - long t = -1; byte[] data; synchronized (grabTaskLock) { currentTask = path; @@ -334,14 +333,15 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { return; } - currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion()); + currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(), + stat.getVersion()); if (currentVersion < 0) { SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); return; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { - HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName), + HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()), SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion); return; } @@ -350,7 +350,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); getDataSetWatchAsync(); - submitTask(path, currentVersion, this.report_period); + submitTask(path, slt.getMode(), currentVersion, this.report_period); // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks try { @@ -385,10 +385,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { * @return non-negative integer value when task can be owned by current region server otherwise -1 */ protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw, - ServerName server, String task, int taskZKVersion) { + ServerName server, String task, RecoveryMode mode, int taskZKVersion) { int latestZKVersion = FAILED_TO_OWN_TASK; try { - SplitLogTask slt = new SplitLogTask.Owned(server); + SplitLogTask slt = new SplitLogTask.Owned(server, mode); Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); if (stat == null) { LOG.warn("zk.setData() returned null for path " + task); @@ -445,7 +445,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { * @param curTask * @param curTaskZKVersion */ - void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) { + void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion, + final int reportPeriod) { final MutableInt zkVersion = new MutableInt(curTaskZKVersion); CancelableProgressable reporter = new CancelableProgressable() { @@ -456,8 +457,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { long t = EnvironmentEdgeManager.currentTimeMillis(); if ((t - last_report_at) > reportPeriod) { last_report_at = t; - int latestZKVersion = - attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue()); + int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode, + zkVersion.intValue()); if (latestZKVersion < 0) { LOG.warn("Failed to heartbeat the task" + curTask); return false; @@ -468,9 +469,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } }; - HLogSplitterHandler hsh = - new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress, - this.splitTaskExecutor); + HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, + this.tasksInProgress, this.splitTaskExecutor, mode); this.executorService.submit(hsh); } @@ -640,6 +640,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { RESIGNED(), PREEMPTED() } - Status exec(String name, CancelableProgressable p); + Status exec(String name, RecoveryMode mode, CancelableProgressable p); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java index 4ac800ef84d..9bfdeed2cdd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -55,11 +56,12 @@ public class HLogSplitterHandler extends EventHandler { private final AtomicInteger inProgressTasks; private final MutableInt curTaskZKVersion; private final TaskExecutor splitTaskExecutor; + private final RecoveryMode mode; public HLogSplitterHandler(final Server server, String curTask, final MutableInt curTaskZKVersion, CancelableProgressable reporter, - AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { + AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) { super(server, EventType.RS_LOG_REPLAY); this.curTask = curTask; this.wal = ZKSplitLog.getFileName(curTask); @@ -70,16 +72,17 @@ public class HLogSplitterHandler extends EventHandler { this.zkw = server.getZooKeeper(); this.curTaskZKVersion = curTaskZKVersion; this.splitTaskExecutor = splitTaskExecutor; + this.mode = mode; } @Override public void process() throws IOException { long startTime = System.currentTimeMillis(); try { - Status status = this.splitTaskExecutor.exec(wal, reporter); + Status status = this.splitTaskExecutor.exec(wal, mode, reporter); switch (status) { case DONE: - endTask(zkw, new SplitLogTask.Done(this.serverName), + endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode), SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); break; case PREEMPTED: @@ -88,7 +91,7 @@ public class HLogSplitterHandler extends EventHandler { break; case ERR: if (server != null && !server.isStopped()) { - endTask(zkw, new SplitLogTask.Err(this.serverName), + endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode), SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); break; } @@ -99,7 +102,7 @@ public class HLogSplitterHandler extends EventHandler { if (server != null && server.isStopped()) { LOG.info("task execution interrupted because worker is exiting " + curTask); } - endTask(zkw, new SplitLogTask.Resigned(this.serverName), + endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode), SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index b9f82b463d3..0ce4a64d19e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.LastSequenceId; @@ -172,7 +173,7 @@ public class HLogSplitter { HLogSplitter(Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, - CoordinatedStateManager csm) { + CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); @@ -190,7 +191,7 @@ public class HLogSplitter { // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); - this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); + this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (zkw != null && csm != null && this.distributedLogReplay) { @@ -224,9 +225,8 @@ public class HLogSplitter { */ public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, - ZooKeeperWatcher zkw, CoordinatedStateManager cp) throws IOException { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, - cp); + ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException { + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode); return s.splitLogFile(logfile, reporter); } @@ -240,7 +240,8 @@ public class HLogSplitter { List splits = new ArrayList(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null); + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, + RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -1980,20 +1981,4 @@ public class HLogSplitter { return mutations; } - - /** - * Returns if distributed log replay is turned on or not - * @param conf - * @return true when distributed log replay is turned on - */ - public static boolean isDistributedLogReplay(Configuration conf) { - boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); - int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); - if (LOG.isDebugEnabled()) { - LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version); - } - // For distributed log replay, hfile version must be 3 at least; we need tag support. - return dlr && (version >= 3); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java index d7082ed370d..b46ec287d58 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.DataInputBuffer; @@ -120,7 +121,8 @@ public class TestSerialization { @Test public void testSplitLogTask() throws DeserializationException { - SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")); + SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), + RecoveryMode.LOG_REPLAY); byte [] bytes = slt.toByteArray(); SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes); assertTrue(slt.equals(sltDeserialized)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 2e742810efa..9fc05656248 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -68,7 +68,7 @@ public class TestMultiParallel { private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); - private static final int slaves = 3; // also used for testing HTable pool size + private static final int slaves = 5; // also used for testing HTable pool size @BeforeClass public static void beforeClass() throws Exception { ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); @@ -692,4 +692,4 @@ public class TestMultiParallel { validateEmpty(result); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index f5e8952c0e8..eabb8136545 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -660,10 +661,14 @@ public class TestAssignmentManager { DeadServer deadServers = new DeadServer(); deadServers.add(SERVERNAME_A); // I need a services instance that will return the AM + MasterFileSystem fs = Mockito.mock(MasterFileSystem.class); + Mockito.doNothing().when(fs).setLogRecoveryMode(); + Mockito.when(fs.getLogRecoveryMode()).thenReturn(RecoveryMode.LOG_REPLAY); MasterServices services = Mockito.mock(MasterServices.class); Mockito.when(services.getAssignmentManager()).thenReturn(am); Mockito.when(services.getServerManager()).thenReturn(this.serverManager); Mockito.when(services.getZooKeeper()).thenReturn(this.watcher); + Mockito.when(services.getMasterFileSystem()).thenReturn(fs); ServerShutdownHandler handler = new ServerShutdownHandler(this.server, services, deadServers, SERVERNAME_A, false); am.failoverCleanupDone.set(true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java index 2fbb849e898..8f57ee44db0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java @@ -94,8 +94,8 @@ public class TestMasterFileSystem { // Create a ZKW to use in the test ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, walPath), - new SplitLogTask.Owned(inRecoveryServerName).toByteArray(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Owned(inRecoveryServerName, fs.getLogRecoveryMode()).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, staleRegion); ZKUtil.createWithParents(zkw, staleRegionPath); String inRecoveringRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index ed51484244c..ceb6adad36a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -48,16 +48,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -90,6 +95,7 @@ public class TestSplitLogManager { private SplitLogManager slm; private Configuration conf; private int to; + private RecoveryMode mode; private static HBaseTestingUtility TEST_UTIL; @@ -134,6 +140,9 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); to = to + 4 * 100; + + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @After @@ -213,7 +222,7 @@ public class TestSplitLogManager { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); - SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER); + SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -238,7 +247,7 @@ public class TestSplitLogManager { " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task - SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER); + SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); @@ -274,19 +283,19 @@ public class TestSplitLogManager { final ServerName worker1 = ServerName.valueOf("worker1,1,1"); final ServerName worker2 = ServerName.valueOf("worker2,1,1"); final ServerName worker3 = ServerName.valueOf("worker3,1,1"); - SplitLogTask slt = new SplitLogTask.Owned(worker1); + SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); - slt = new SplitLogTask.Owned(worker2); + slt = new SplitLogTask.Owned(worker2, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2); int version2 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version2 > version1); - slt = new SplitLogTask.Owned(worker3); + slt = new SplitLogTask.Owned(worker3, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 2, 3, to/2); waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); @@ -304,7 +313,7 @@ public class TestSplitLogManager { String tasknode = submitTaskAndWait(batch, "foo/1"); int version = ZKUtil.checkExists(zkw, tasknode); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); - SplitLogTask slt = new SplitLogTask.Owned(worker1); + SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); waitForCounter(new Expr() { @@ -331,7 +340,7 @@ public class TestSplitLogManager { TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); - SplitLogTask slt = new SplitLogTask.Done(worker1); + SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { @@ -352,7 +361,7 @@ public class TestSplitLogManager { String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); - SplitLogTask slt = new SplitLogTask.Err(worker1); + SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { @@ -376,7 +385,7 @@ public class TestSplitLogManager { assertEquals(tot_mgr_resubmit.get(), 0); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); assertEquals(tot_mgr_resubmit.get(), 0); - SplitLogTask slt = new SplitLogTask.Resigned(worker1); + SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode); assertEquals(tot_mgr_resubmit.get(), 0); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); int version = ZKUtil.checkExists(zkw, tasknode); @@ -399,7 +408,7 @@ public class TestSplitLogManager { // create an orphan task in OWNED state String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); - SplitLogTask slt = new SplitLogTask.Owned(worker1); + SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -414,7 +423,7 @@ public class TestSplitLogManager { for (int i = 0; i < (3 * to)/100; i++) { Thread.sleep(100); final ServerName worker2 = ServerName.valueOf("worker1,1,1"); - slt = new SplitLogTask.Owned(worker2); + slt = new SplitLogTask.Owned(worker2, this.mode); ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); } @@ -438,7 +447,7 @@ public class TestSplitLogManager { String tasknode = submitTaskAndWait(batch, "foo/1"); int version = ZKUtil.checkExists(zkw, tasknode); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); - SplitLogTask slt = new SplitLogTask.Owned(worker1); + SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); slm.handleDeadWorker(worker1); @@ -463,7 +472,7 @@ public class TestSplitLogManager { String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); - SplitLogTask slt = new SplitLogTask.Owned(worker1); + SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); @@ -513,4 +522,25 @@ public class TestSplitLogManager { assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty()); } + + @Test(timeout=60000) + public void testGetPreviousRecoveryMode() throws Exception { + LOG.info("testGetPreviousRecoveryMode"); + SplitLogCounters.resetCounters(); + Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), + new SplitLogTask.Unassigned( + ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER); + assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING); + + zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); + slm.setRecoveryMode(false); + assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index 2d94a77dd0b..10f29574b19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -132,7 +132,7 @@ public class TestRegionServerNoMaster { ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); // first version is '0' AdminProtos.OpenRegionRequest orr = - RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null); + RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null); AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr); Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); Assert.assertTrue(responseOpen.getOpeningState(0). @@ -251,7 +251,8 @@ public class TestRegionServerNoMaster { // We're sending multiple requests in a row. The region server must handle this nicely. for (int i = 0; i < 10; i++) { - AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null); + AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( + getRS().getServerName(), hri, 0, null, null); AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr); Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); @@ -277,7 +278,7 @@ public class TestRegionServerNoMaster { // fake region to be closing now, need to clear state afterwards getRS().regionsInTransitionInRS.put(hri.getEncodedNameAsBytes(), Boolean.FALSE); AdminProtos.OpenRegionRequest orr = - RequestConverter.buildOpenRegionRequest(sn, hri, 0, null); + RequestConverter.buildOpenRegionRequest(sn, hri, 0, null, null); getRS().rpcServices.openRegion(null, orr); Assert.fail("The closing region should not be opened"); } catch (ServiceException se) { @@ -454,7 +455,8 @@ public class TestRegionServerNoMaster { //actual close closeNoZK(); try { - AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(earlierServerName, hri, 0, null); + AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( + earlierServerName, hri, 0, null, null); getRS().getRSRpcServices().openRegion(null, orr); Assert.fail("The openRegion should have been rejected"); } catch (ServiceException se) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index eaf55472368..dcb1e88355c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; @@ -38,6 +40,8 @@ import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -64,6 +68,7 @@ public class TestSplitLogWorker { private ZooKeeperWatcher zkw; private SplitLogWorker slw; private ExecutorService executorService; + private RecoveryMode mode; private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) throws Exception { @@ -98,6 +103,7 @@ public class TestSplitLogWorker { @Before public void setup() throws Exception { TEST_UTIL.startMiniZKCluster(); + Configuration conf = TEST_UTIL.getConfiguration(); zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); @@ -112,6 +118,8 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); executorService = new ExecutorService("TestSplitLogWorker"); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @After @@ -126,7 +134,7 @@ public class TestSplitLogWorker { new SplitLogWorker.TaskExecutor() { @Override - public Status exec(String name, CancelableProgressable p) { + public Status exec(String name, RecoveryMode mode, CancelableProgressable p) { while (true) { try { Thread.sleep(1000); @@ -149,7 +157,8 @@ public class TestSplitLogWorker { final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = @@ -184,8 +193,8 @@ public class TestSplitLogWorker { final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), - new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); RegionServerServices mockedRS1 = getRegionServer(SVR1); RegionServerServices mockedRS2 = getRegionServer(SVR2); SplitLogWorker slw1 = @@ -227,15 +236,15 @@ public class TestSplitLogWorker { // this time create a task node after starting the splitLogWorker zkw.getRecoverableZooKeeper().create(PATH, - new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); assertEquals(1, slw.taskReadySeq); byte [] bytes = ZKUtil.getData(zkw, PATH); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SRV)); - slt = new SplitLogTask.Owned(MANAGER); + slt = new SplitLogTask.Owned(MANAGER, this.mode); ZKUtil.setData(zkw, PATH, slt.toByteArray()); waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); } finally { @@ -258,7 +267,8 @@ public class TestSplitLogWorker { Thread.sleep(100); waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); - SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER); + SplitLogTask unassignedManager = + new SplitLogTask.Unassigned(MANAGER, this.mode); zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -272,7 +282,7 @@ public class TestSplitLogWorker { // preempt the first task, have it owned by another worker final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); - SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); + SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode); ZKUtil.setData(zkw, PATH1, slt.toByteArray()); waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); @@ -298,7 +308,7 @@ public class TestSplitLogWorker { Thread.sleep(100); String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); - SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); + SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode); zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -351,8 +361,8 @@ public class TestSplitLogWorker { for (int i = 0; i < maxTasks; i++) { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); @@ -394,9 +404,8 @@ public class TestSplitLogWorker { for (int i = 0; i < maxTasks; i++) { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index f03c81dadf3..548f4d580fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -444,6 +444,7 @@ public class TestSplitTransactionOnCluster { AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false; admin.setBalancerRunning(true, false); cluster.getMaster().setCatalogJanitorEnabled(true); + cluster.startRegionServer(); t.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java index d376c29e885..cdf71f68cfa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer; import org.apache.hadoop.hbase.util.Bytes; @@ -111,8 +112,10 @@ public class TestHLogMethods { @Test public void testEntrySink() throws Exception { Configuration conf = new Configuration(); + RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); HLogSplitter splitter = new HLogSplitter( - conf, mock(Path.class), mock(FileSystem.class), null, null, null); + conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode); EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024); for (int i = 0; i < 1000; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index e6495c4e280..dc39415a29b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; @@ -127,6 +128,7 @@ public class TestHLogSplit { private static String ROBBER; private static String ZOMBIE; private static String [] GROUP = new String [] {"supergroup"}; + private RecoveryMode mode; static enum Corruptions { INSERT_GARBAGE_ON_FIRST_LINE, @@ -177,6 +179,8 @@ public class TestHLogSplit { REGIONS.clear(); Collections.addAll(REGIONS, "bbb", "ccc"); InstrumentedSequenceFileLogWriter.activateFailure = false; + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @After @@ -805,7 +809,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); // Set up a splitter that will throw an IOE on the output side HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null, null) { + conf, HBASEDIR, fs, null, null, null, this.mode) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); @@ -938,7 +942,7 @@ public class TestHLogSplit { try { conf.setInt("hbase.splitlog.report.period", 1000); boolean ret = HLogSplitter.splitLogFile( - HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null); + HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode); assertFalse("Log splitting should failed", ret); assertTrue(count.get() > 0); } catch (IOException e) { @@ -997,7 +1001,7 @@ public class TestHLogSplit { // Create a splitter that reads and writes the data without touching disk HLogSplitter logSplitter = new HLogSplitter( - localConf, HBASEDIR, fs, null, null, null) { + localConf, HBASEDIR, fs, null, null, null, this.mode) { /* Produce a mock writer that doesn't write anywhere */ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) @@ -1282,7 +1286,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null, null) { + conf, HBASEDIR, fs, null, null, null, this.mode) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index ad889c68ad5..ea368df2c79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; import org.apache.hadoop.hbase.regionserver.FlushRequestListener; @@ -98,6 +99,8 @@ public class TestWALReplay { private Path logDir; private FileSystem fs; private Configuration conf; + private RecoveryMode mode; + @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -128,6 +131,8 @@ public class TestWALReplay { if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @After @@ -873,7 +878,7 @@ public class TestWALReplay { wal.close(); FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null, null); + this.fs, this.conf, null, null, null, null, mode); FileStatus[] listStatus1 = this.fs.listStatus( new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), "recovered.edits")));