diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index ea29e4f41c9..3695e922c6a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -1563,7 +1563,7 @@ public class MetaTableAccessor {
Put put = new Put(regionInfo.getRegionName(), now);
addRegionInfo(put, regionInfo);
if (sn != null) {
- addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
+ addLocation(put, sn, openSeqNum, -1, regionInfo.getReplicaId());
}
putToMetaTable(connection, put);
LOG.info("Added daughter " + regionInfo.getEncodedName() +
@@ -1600,7 +1600,7 @@ public class MetaTableAccessor {
Delete deleteB = makeDeleteFromRegionInfo(regionB);
// The merged is a new region, openSeqNum = 1 is fine.
- addLocation(putOfMerged, sn, 1, mergedRegion.getReplicaId());
+ addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId());
// Add empty locations for region replicas of the merged region so that number of replicas can
// be cached whenever the primary region is looked up from meta
@@ -1644,8 +1644,8 @@ public class MetaTableAccessor {
Put putA = makePutFromRegionInfo(splitA);
Put putB = makePutFromRegionInfo(splitB);
- addLocation(putA, sn, 1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine.
- addLocation(putB, sn, 1, splitB.getReplicaId());
+ addLocation(putA, sn, 1, -1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine.
+ addLocation(putB, sn, 1, -1, splitB.getReplicaId());
// Add empty locations for region replicas of daughters so that number of replicas can be
// cached whenever the primary region is looked up from meta
@@ -1742,13 +1742,16 @@ public class MetaTableAccessor {
*
* @param connection connection we're using
* @param regionInfo region to update location of
+ * @param openSeqNum the latest sequence number obtained when the region was open
* @param sn Server name
+ * @param masterSystemTime wall clock time from master if passed in the open region RPC or -1
* @throws IOException
*/
public static void updateRegionLocation(Connection connection,
- HRegionInfo regionInfo, ServerName sn, long updateSeqNum)
+ HRegionInfo regionInfo, ServerName sn, long openSeqNum,
+ long masterSystemTime)
throws IOException {
- updateLocation(connection, regionInfo, sn, updateSeqNum);
+ updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime);
}
/**
@@ -1761,16 +1764,21 @@ public class MetaTableAccessor {
* @param regionInfo region to update location of
* @param sn Server name
* @param openSeqNum the latest sequence number obtained when the region was open
+ * @param masterSystemTime wall clock time from master if passed in the open region RPC or -1
* @throws IOException In particular could throw {@link java.net.ConnectException}
* if the server is down on other end.
*/
private static void updateLocation(final Connection connection,
- HRegionInfo regionInfo, ServerName sn, long openSeqNum)
+ HRegionInfo regionInfo, ServerName sn, long openSeqNum,
+ long masterSystemTime)
throws IOException {
+
+ // use the maximum of what master passed us vs local time.
+ long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
+
// region replicas are kept in the primary region's row
- long time = EnvironmentEdgeManager.currentTime();
Put put = new Put(getMetaKeyForRegion(regionInfo), time);
- addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
+ addLocation(put, sn, openSeqNum, time, regionInfo.getReplicaId());
putToMetaTable(connection, put);
LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
" with server=" + sn);
@@ -1886,15 +1894,16 @@ public class MetaTableAccessor {
return p;
}
- public static Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId){
- // using regionserver's local time as the timestamp of Put.
- // See: HBASE-11536
- long now = EnvironmentEdgeManager.currentTime();
- p.addImmutable(getCatalogFamily(), getServerColumn(replicaId), now,
+ public static Put addLocation(final Put p, final ServerName sn, long openSeqNum,
+ long time, int replicaId){
+ if (time <= 0) {
+ time = EnvironmentEdgeManager.currentTime();
+ }
+ p.addImmutable(getCatalogFamily(), getServerColumn(replicaId), time,
Bytes.toBytes(sn.getHostAndPort()));
- p.addImmutable(getCatalogFamily(), getStartCodeColumn(replicaId), now,
+ p.addImmutable(getCatalogFamily(), getStartCodeColumn(replicaId), time,
Bytes.toBytes(sn.getStartcode()));
- p.addImmutable(getCatalogFamily(), getSeqNumColumn(replicaId), now,
+ p.addImmutable(getCatalogFamily(), getSeqNumColumn(replicaId), time,
Bytes.toBytes(openSeqNum));
return p;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index b0dac2fd289..ff77e515fb5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -106,6 +106,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionReq
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
@@ -785,6 +786,8 @@ public final class RequestConverter {
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
+ // send the master's wall clock time as well, so that the RS can refer to it
+ builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build();
}
@@ -806,6 +809,7 @@ public final class RequestConverter {
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
+ builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build();
}
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index a76936d935f..b6c511c2c79 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -3874,6 +3874,24 @@ public final class AdminProtos {
*
*/
long getServerStartCode();
+
+ // optional uint64 master_system_time = 5;
+ /**
+ * optional uint64 master_system_time = 5;
+ *
+ *
+ * wall clock time from master + *+ */ + boolean hasMasterSystemTime(); + /** + *
optional uint64 master_system_time = 5;
+ *
+ * + * wall clock time from master + *+ */ + long getMasterSystemTime(); } /** * Protobuf type {@code OpenRegionRequest} @@ -3939,6 +3957,11 @@ public final class AdminProtos { serverStartCode_ = input.readUInt64(); break; } + case 40: { + bitField0_ |= 0x00000002; + masterSystemTime_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -5195,9 +5218,34 @@ public final class AdminProtos { return serverStartCode_; } + // optional uint64 master_system_time = 5; + public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 5; + private long masterSystemTime_; + /** + *
optional uint64 master_system_time = 5;
+ *
+ * + * wall clock time from master + *+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + *
optional uint64 master_system_time = 5;
+ *
+ * + * wall clock time from master + *+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + private void initFields() { openInfo_ = java.util.Collections.emptyList(); serverStartCode_ = 0L; + masterSystemTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5223,6 +5271,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeUInt64(2, serverStartCode_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(5, masterSystemTime_); + } getUnknownFields().writeTo(output); } @@ -5240,6 +5291,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(2, serverStartCode_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(5, masterSystemTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5270,6 +5325,11 @@ public final class AdminProtos { result = result && (getServerStartCode() == other.getServerStartCode()); } + result = result && (hasMasterSystemTime() == other.hasMasterSystemTime()); + if (hasMasterSystemTime()) { + result = result && (getMasterSystemTime() + == other.getMasterSystemTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5291,6 +5351,10 @@ public final class AdminProtos { hash = (37 * hash) + SERVERSTARTCODE_FIELD_NUMBER; hash = (53 * hash) + hashLong(getServerStartCode()); } + if (hasMasterSystemTime()) { + hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMasterSystemTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -5409,6 +5473,8 @@ public final class AdminProtos { } serverStartCode_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); + masterSystemTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -5450,6 +5516,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000001; } result.serverStartCode_ = serverStartCode_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.masterSystemTime_ = masterSystemTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5495,6 +5565,9 @@ public final class AdminProtos { if (other.hasServerStartCode()) { setServerStartCode(other.getServerStartCode()); } + if (other.hasMasterSystemTime()) { + setMasterSystemTime(other.getMasterSystemTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -5817,6 +5890,55 @@ public final class AdminProtos { return this; } + // optional uint64 master_system_time = 5; + private long masterSystemTime_ ; + /** + *
optional uint64 master_system_time = 5;
+ *
+ * + * wall clock time from master + *+ */ + public boolean hasMasterSystemTime() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + *
optional uint64 master_system_time = 5;
+ *
+ * + * wall clock time from master + *+ */ + public long getMasterSystemTime() { + return masterSystemTime_; + } + /** + *
optional uint64 master_system_time = 5;
+ *
+ * + * wall clock time from master + *+ */ + public Builder setMasterSystemTime(long value) { + bitField0_ |= 0x00000004; + masterSystemTime_ = value; + onChanged(); + return this; + } + /** + *
optional uint64 master_system_time = 5;
+ *
+ * + * wall clock time from master + *+ */ + public Builder clearMasterSystemTime() { + bitField0_ = (bitField0_ & ~0x00000004); + masterSystemTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:OpenRegionRequest) } @@ -23249,87 +23371,88 @@ public final class AdminProtos { "FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" + "nlineRegionRequest\";\n\027GetOnlineRegionRes" + "ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" + - "\374\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" + + "\230\002\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" + "2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" + - "erverStartCode\030\002 \001(\004\032\227\001\n\016RegionOpenInfo\022" + - "\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_" + - "of_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003" + - " \003(\0132\013.ServerName\022#\n\033openForDistributedL", - "ogReplay\030\004 \001(\010\"\235\001\n\022OpenRegionResponse\022=\n" + - "\ropening_state\030\001 \003(\0162&.OpenRegionRespons" + - "e.RegionOpeningState\"H\n\022RegionOpeningSta" + - "te\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FA" + - "ILED_OPENING\020\002\"6\n\023WarmupRegionRequest\022\037\n" + - "\nregionInfo\030\001 \002(\0132\013.RegionInfo\"\026\n\024Warmup" + - "RegionResponse\"\271\001\n\022CloseRegionRequest\022 \n" + - "\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027versi" + - "on_of_closing_node\030\002 \001(\r\022\036\n\020transition_i" + - "n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004", - " \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" + - "(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(" + - "\010\"p\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" + - "\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " + - "\001(\004\022\036\n\026write_flush_wal_marker\030\003 \001(\010\"_\n\023F" + - "lushRegionResponse\022\027\n\017last_flush_time\030\001 " + - "\002(\004\022\017\n\007flushed\030\002 \001(\010\022\036\n\026wrote_flush_wal_" + - "marker\030\003 \001(\010\"K\n\022SplitRegionRequest\022 \n\006re" + - "gion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013split_po" + - "int\030\002 \001(\014\"\025\n\023SplitRegionResponse\"W\n\024Comp", - "actRegionRequest\022 \n\006region\030\001 \002(\0132\020.Regio" + - "nSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(" + - "\014\"\027\n\025CompactRegionResponse\"\262\001\n\031UpdateFav" + - "oredNodesRequest\022@\n\013update_info\030\001 \003(\0132+." + - "UpdateFavoredNodesRequest.RegionUpdateIn" + - "fo\032S\n\020RegionUpdateInfo\022\033\n\006region\030\001 \002(\0132\013" + - ".RegionInfo\022\"\n\rfavored_nodes\030\002 \003(\0132\013.Ser" + - "verName\".\n\032UpdateFavoredNodesResponse\022\020\n" + - "\010response\030\001 \001(\r\"v\n\023MergeRegionsRequest\022\"" + - "\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010re", - "gion_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcib" + - "le\030\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"" + - "X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key" + - "_value_bytes\030\002 \003(\014\022\035\n\025associated_cell_co" + - "unt\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n" + - "\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALE" + - "ntryResponse\"\026\n\024RollWALWriterRequest\"0\n\025" + - "RollWALWriterResponse\022\027\n\017region_to_flush" + - "\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " + - "\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn", - "foRequest\"B\n\nServerInfo\022 \n\013server_name\030\001" + - " \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n" + - "\025GetServerInfoResponse\022 \n\013server_info\030\001 " + - "\002(\0132\013.ServerInfo\"\034\n\032UpdateConfigurationR" + - "equest\"\035\n\033UpdateConfigurationResponse2\325\010" + - "\n\014AdminService\022>\n\rGetRegionInfo\022\025.GetReg" + - "ionInfoRequest\032\026.GetRegionInfoResponse\022;" + - "\n\014GetStoreFile\022\024.GetStoreFileRequest\032\025.G" + - "etStoreFileResponse\022D\n\017GetOnlineRegion\022\027" + - ".GetOnlineRegionRequest\032\030.GetOnlineRegio", - "nResponse\0225\n\nOpenRegion\022\022.OpenRegionRequ" + - "est\032\023.OpenRegionResponse\022;\n\014WarmupRegion" + - "\022\024.WarmupRegionRequest\032\025.WarmupRegionRes" + - "ponse\0228\n\013CloseRegion\022\023.CloseRegionReques" + - "t\032\024.CloseRegionResponse\0228\n\013FlushRegion\022\023" + - ".FlushRegionRequest\032\024.FlushRegionRespons" + - "e\0228\n\013SplitRegion\022\023.SplitRegionRequest\032\024." + - "SplitRegionResponse\022>\n\rCompactRegion\022\025.C" + - "ompactRegionRequest\032\026.CompactRegionRespo" + - "nse\022;\n\014MergeRegions\022\024.MergeRegionsReques", - "t\032\025.MergeRegionsResponse\022J\n\021ReplicateWAL" + - "Entry\022\031.ReplicateWALEntryRequest\032\032.Repli" + - "cateWALEntryResponse\022?\n\006Replay\022\031.Replica" + - "teWALEntryRequest\032\032.ReplicateWALEntryRes" + - "ponse\022>\n\rRollWALWriter\022\025.RollWALWriterRe" + - "quest\032\026.RollWALWriterResponse\022>\n\rGetServ" + - "erInfo\022\025.GetServerInfoRequest\032\026.GetServe" + - "rInfoResponse\0225\n\nStopServer\022\022.StopServer" + - "Request\032\023.StopServerResponse\022M\n\022UpdateFa" + - "voredNodes\022\032.UpdateFavoredNodesRequest\032\033", - ".UpdateFavoredNodesResponse\022P\n\023UpdateCon" + - "figuration\022\033.UpdateConfigurationRequest\032" + - "\034.UpdateConfigurationResponseBA\n*org.apa" + - "che.hadoop.hbase.protobuf.generatedB\013Adm" + - "inProtosH\001\210\001\001\240\001\001" + "erverStartCode\030\002 \001(\004\022\032\n\022master_system_ti" + + "me\030\005 \001(\004\032\227\001\n\016RegionOpenInfo\022\033\n\006region\030\001 " + + "\002(\0132\013.RegionInfo\022\037\n\027version_of_offline_n" + + "ode\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 \003(\0132\013.Serve", + "rName\022#\n\033openForDistributedLogReplay\030\004 \001" + + "(\010\"\235\001\n\022OpenRegionResponse\022=\n\ropening_sta" + + "te\030\001 \003(\0162&.OpenRegionResponse.RegionOpen" + + "ingState\"H\n\022RegionOpeningState\022\n\n\006OPENED" + + "\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FAILED_OPENING" + + "\020\002\"6\n\023WarmupRegionRequest\022\037\n\nregionInfo\030" + + "\001 \002(\0132\013.RegionInfo\"\026\n\024WarmupRegionRespon" + + "se\"\271\001\n\022CloseRegionRequest\022 \n\006region\030\001 \002(" + + "\0132\020.RegionSpecifier\022\037\n\027version_of_closin" + + "g_node\030\002 \001(\r\022\036\n\020transition_in_ZK\030\003 \001(\010:\004", + "true\022\'\n\022destination_server\030\004 \001(\0132\013.Serve" + + "rName\022\027\n\017serverStartCode\030\005 \001(\004\"%\n\023CloseR" + + "egionResponse\022\016\n\006closed\030\001 \002(\010\"p\n\022FlushRe" + + "gionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec" + + "ifier\022\030\n\020if_older_than_ts\030\002 \001(\004\022\036\n\026write" + + "_flush_wal_marker\030\003 \001(\010\"_\n\023FlushRegionRe" + + "sponse\022\027\n\017last_flush_time\030\001 \002(\004\022\017\n\007flush" + + "ed\030\002 \001(\010\022\036\n\026wrote_flush_wal_marker\030\003 \001(\010" + + "\"K\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020" + + ".RegionSpecifier\022\023\n\013split_point\030\002 \001(\014\"\025\n", + "\023SplitRegionResponse\"W\n\024CompactRegionReq" + + "uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r" + + "\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025Compact" + + "RegionResponse\"\262\001\n\031UpdateFavoredNodesReq" + + "uest\022@\n\013update_info\030\001 \003(\0132+.UpdateFavore" + + "dNodesRequest.RegionUpdateInfo\032S\n\020Region" + + "UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" + + "\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" + + "pdateFavoredNodesResponse\022\020\n\010response\030\001 " + + "\001(\r\"v\n\023MergeRegionsRequest\022\"\n\010region_a\030\001", + " \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(\013" + + "2\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005fa" + + "lse\"\026\n\024MergeRegionsResponse\"X\n\010WALEntry\022" + + "\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_bytes" + + "\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001(\005\"4\n" + + "\030ReplicateWALEntryRequest\022\030\n\005entry\030\001 \003(\013" + + "2\t.WALEntry\"\033\n\031ReplicateWALEntryResponse" + + "\"\026\n\024RollWALWriterRequest\"0\n\025RollWALWrite" + + "rResponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021St" + + "opServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopS", + "erverResponse\"\026\n\024GetServerInfoRequest\"B\n" + + "\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013.Serve" + + "rName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetServerIn" + + "foResponse\022 \n\013server_info\030\001 \002(\0132\013.Server" + + "Info\"\034\n\032UpdateConfigurationRequest\"\035\n\033Up" + + "dateConfigurationResponse2\325\010\n\014AdminServi" + + "ce\022>\n\rGetRegionInfo\022\025.GetRegionInfoReque" + + "st\032\026.GetRegionInfoResponse\022;\n\014GetStoreFi" + + "le\022\024.GetStoreFileRequest\032\025.GetStoreFileR" + + "esponse\022D\n\017GetOnlineRegion\022\027.GetOnlineRe", + "gionRequest\032\030.GetOnlineRegionResponse\0225\n" + + "\nOpenRegion\022\022.OpenRegionRequest\032\023.OpenRe" + + "gionResponse\022;\n\014WarmupRegion\022\024.WarmupReg" + + "ionRequest\032\025.WarmupRegionResponse\0228\n\013Clo" + + "seRegion\022\023.CloseRegionRequest\032\024.CloseReg" + + "ionResponse\0228\n\013FlushRegion\022\023.FlushRegion" + + "Request\032\024.FlushRegionResponse\0228\n\013SplitRe" + + "gion\022\023.SplitRegionRequest\032\024.SplitRegionR" + + "esponse\022>\n\rCompactRegion\022\025.CompactRegion" + + "Request\032\026.CompactRegionResponse\022;\n\014Merge", + "Regions\022\024.MergeRegionsRequest\032\025.MergeReg" + + "ionsResponse\022J\n\021ReplicateWALEntry\022\031.Repl" + + "icateWALEntryRequest\032\032.ReplicateWALEntry" + + "Response\022?\n\006Replay\022\031.ReplicateWALEntryRe" + + "quest\032\032.ReplicateWALEntryResponse\022>\n\rRol" + + "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + + "WALWriterResponse\022>\n\rGetServerInfo\022\025.Get" + + "ServerInfoRequest\032\026.GetServerInfoRespons" + + "e\0225\n\nStopServer\022\022.StopServerRequest\032\023.St" + + "opServerResponse\022M\n\022UpdateFavoredNodes\022\032", + ".UpdateFavoredNodesRequest\032\033.UpdateFavor" + + "edNodesResponse\022P\n\023UpdateConfiguration\022\033" + + ".UpdateConfigurationRequest\032\034.UpdateConf" + + "igurationResponseBA\n*org.apache.hadoop.h" + + "base.protobuf.generatedB\013AdminProtosH\001\210\001" + + "\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -23377,7 +23500,7 @@ public final class AdminProtos { internal_static_OpenRegionRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_OpenRegionRequest_descriptor, - new java.lang.String[] { "OpenInfo", "ServerStartCode", }); + new java.lang.String[] { "OpenInfo", "ServerStartCode", "MasterSystemTime", }); internal_static_OpenRegionRequest_RegionOpenInfo_descriptor = internal_static_OpenRegionRequest_descriptor.getNestedTypes().get(0); internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 1df0958ac76..550f0b88b2a 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -70,6 +70,8 @@ message OpenRegionRequest { repeated RegionOpenInfo open_info = 1; // the intended server for this RPC. optional uint64 serverStartCode = 2; + // wall clock time from master + optional uint64 master_system_time = 5; message RegionOpenInfo { required RegionInfo region = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index ba76115c3da..62b7333494e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -198,7 +198,7 @@ public class RegionStateStore { if (openSeqNum >= 0) { Preconditions.checkArgument(state == State.OPEN && serverName != null, "Open region should be on a server"); - MetaTableAccessor.addLocation(put, serverName, openSeqNum, replicaId); + MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId); info.append("&openSeqNum=").append(openSeqNum); info.append("&server=").append(serverName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 07cb8ee5540..53a109414a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1875,6 +1875,14 @@ public class HRegionServer extends HasThread implements @Override public void postOpenDeployTasks(final Region r) throws KeeperException, IOException { + postOpenDeployTasks(new PostOpenDeployContext(r, -1)); + } + + @Override + public void postOpenDeployTasks(final PostOpenDeployContext context) + throws KeeperException, IOException { + Region r = context.getRegion(); + long masterSystemTime = context.getMasterSystemTime(); Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion"); rpcServices.checkOpen(); LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString()); @@ -1896,8 +1904,8 @@ public class HRegionServer extends HasThread implements updateRecoveringRegionLastFlushedSequenceId(r); // Notify master - if (!reportRegionStateTransition( - TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) { + if (!reportRegionStateTransition(new RegionStateTransitionContext( + TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) { throw new IOException("Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); } @@ -1915,6 +1923,17 @@ public class HRegionServer extends HasThread implements @Override public boolean reportRegionStateTransition( TransitionCode code, long openSeqNum, HRegionInfo... hris) { + return reportRegionStateTransition( + new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris)); + } + + @Override + public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { + TransitionCode code = context.getCode(); + long openSeqNum = context.getOpenSeqNum(); + long masterSystemTime = context.getMasterSystemTime(); + HRegionInfo[] hris = context.getHris(); + if (TEST_SKIP_REPORTING_TRANSITION) { // This is for testing only in case there is no master // to handle the region transition report at all. @@ -1931,7 +1950,7 @@ public class HRegionServer extends HasThread implements } else { try { MetaTableAccessor.updateRegionLocation(clusterConnection, - hris[0], serverName, openSeqNum); + hris[0], serverName, openSeqNum, masterSystemTime); } catch (IOException e) { LOG.info("Failed to update meta", e); return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 6491a5ce717..9364162657a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1405,6 +1405,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } + + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; + for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion()); HTableDescriptor htd; @@ -1473,12 +1476,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Need to pass the expected version in the constructor. if (region.isMetaRegion()) { regionServer.service.submit(new OpenMetaHandler( - regionServer, regionServer, region, htd)); + regionServer, regionServer, region, htd, masterSystemTime)); } else { regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), regionOpenInfo.getFavoredNodesList()); regionServer.service.submit(new OpenRegionHandler( - regionServer, regionServer, region, htd)); + regionServer, regionServer, region, htd, masterSystemTime)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index eaffa380e08..cd4816c3a31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -79,6 +79,36 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi */ RegionServerQuotaManager getRegionServerQuotaManager(); + /** + * Context for postOpenDeployTasks(). + */ + class PostOpenDeployContext { + private final Region region; + private final long masterSystemTime; + + @InterfaceAudience.Private + public PostOpenDeployContext(Region region, long masterSystemTime) { + this.region = region; + this.masterSystemTime = masterSystemTime; + } + public Region getRegion() { + return region; + } + public long getMasterSystemTime() { + return masterSystemTime; + } + } + + /** + * Tasks to perform after region open to complete deploy of region on + * regionserver + * + * @param context the context + * @throws KeeperException + * @throws IOException + */ + void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException; + /** * Tasks to perform after region open to complete deploy of region on * regionserver @@ -86,17 +116,56 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * @param r Region to open. * @throws KeeperException * @throws IOException + * @deprecated use {@link #postOpenDeployTasks(PostOpenDeployContext)} */ + @Deprecated void postOpenDeployTasks(final Region r) throws KeeperException, IOException; + class RegionStateTransitionContext { + private final TransitionCode code; + private final long openSeqNum; + private final long masterSystemTime; + private final HRegionInfo[] hris; + + @InterfaceAudience.Private + public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long masterSystemTime, + HRegionInfo... hris) { + this.code = code; + this.openSeqNum = openSeqNum; + this.masterSystemTime = masterSystemTime; + this.hris = hris; + } + public TransitionCode getCode() { + return code; + } + public long getOpenSeqNum() { + return openSeqNum; + } + public long getMasterSystemTime() { + return masterSystemTime; + } + public HRegionInfo[] getHris() { + return hris; + } + } + /** * Notify master that a handler requests to change a region state */ + boolean reportRegionStateTransition(final RegionStateTransitionContext context); + + /** + * Notify master that a handler requests to change a region state + * @deprecated use {@link #reportRegionStateTransition(RegionStateTransitionContext)} + */ + @Deprecated boolean reportRegionStateTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris); /** * Notify master that a handler requests to change a region state + * @deprecated use {@link #reportRegionStateTransition(RegionStateTransitionContext)} */ + @Deprecated boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java index 3b96a9ee541..e49b164bc38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; public class OpenMetaHandler extends OpenRegionHandler { public OpenMetaHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - final HTableDescriptor htd) { - super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META); + final HTableDescriptor htd, long masterSystemTime) { + super(server, rsServices, regionInfo, htd, masterSystemTime, EventType.M_RS_OPEN_META); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index ecf066506a9..f4b52d637de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext; import org.apache.hadoop.hbase.util.CancelableProgressable; /** * Handles opening of a region on a region server. @@ -47,20 +48,22 @@ public class OpenRegionHandler extends EventHandler { private final HRegionInfo regionInfo; private final HTableDescriptor htd; + private final long masterSystemTime; public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - HTableDescriptor htd) { - this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION); + HTableDescriptor htd, long masterSystemTime) { + this(server, rsServices, regionInfo, htd, masterSystemTime, EventType.M_RS_OPEN_REGION); } protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, - final HTableDescriptor htd, EventType eventType) { + final HTableDescriptor htd, long masterSystemTime, EventType eventType) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; this.htd = htd; + this.masterSystemTime = masterSystemTime; } public HRegionInfo getRegionInfo() { @@ -105,7 +108,7 @@ public class OpenRegionHandler extends EventHandler { return; } - if (!updateMeta(region) || this.server.isStopped() || + if (!updateMeta(region, masterSystemTime) || this.server.isStopped() || this.rsServices.isStopping()) { return; } @@ -167,7 +170,7 @@ public class OpenRegionHandler extends EventHandler { * state meantime so master doesn't timeout our region-in-transition. * Caller must cleanup region if this fails. */ - boolean updateMeta(final HRegion r) { + boolean updateMeta(final HRegion r, long masterSystemTime) { if (this.server.isStopped() || this.rsServices.isStopping()) { return false; } @@ -175,7 +178,7 @@ public class OpenRegionHandler extends EventHandler { // Else, wait. final AtomicBoolean signaller = new AtomicBoolean(false); PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r, - this.server, this.rsServices, signaller); + this.server, this.rsServices, signaller, masterSystemTime); t.start(); // Post open deploy task: // meta => update meta location in ZK @@ -225,20 +228,23 @@ public class OpenRegionHandler extends EventHandler { private final RegionServerServices services; private final HRegion region; private final AtomicBoolean signaller; + private final long masterSystemTime; PostOpenDeployTasksThread(final HRegion region, final Server server, - final RegionServerServices services, final AtomicBoolean signaller) { + final RegionServerServices services, final AtomicBoolean signaller, long masterSystemTime) { super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName()); this.setDaemon(true); this.server = server; this.services = services; this.region = region; this.signaller = signaller; + this.masterSystemTime = masterSystemTime; } + @Override public void run() { try { - this.services.postOpenDeployTasks(this.region); + this.services.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime)); } catch (Throwable e) { String msg = "Exception running postOpenDeployTasks; region=" + this.region.getRegionInfo().getEncodedName(); @@ -278,6 +284,7 @@ public class OpenRegionHandler extends EventHandler { this.server.getConfiguration(), this.rsServices, new CancelableProgressable() { + @Override public boolean progress() { if (!isRegionStillOpening()) { LOG.warn("Open region aborted since it isn't opening any more"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java index 7de7af8f143..e681789cc96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java @@ -149,7 +149,7 @@ public class HBaseFsckRepair { * (default 120s) to close the region. This bypasses the active hmaster. */ @SuppressWarnings("deprecation") - public static void closeRegionSilentlyAndWait(HConnection connection, + public static void closeRegionSilentlyAndWait(HConnection connection, ServerName server, HRegionInfo region) throws IOException, InterruptedException { long timeout = connection.getConfiguration() .getLong("hbase.hbck.close.timeout", 120000); @@ -174,7 +174,7 @@ public class HBaseFsckRepair { // see the additional replicas when it is asked to assign. The // final value of these columns will be different and will be updated // by the actual regionservers that start hosting the respective replicas - MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), i); + MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), -1, i); } } meta.put(put); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 810ab9089f4..c126b19a764 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -119,6 +119,12 @@ public class MockRegionServerServices implements RegionServerServices { addToOnlineRegions(r); } + @Override + public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException, + IOException { + addToOnlineRegions(context.getRegion()); + } + @Override public boolean isStopping() { return this.stopping; @@ -280,6 +286,11 @@ public class MockRegionServerServices implements RegionServerServices { return false; } + @Override + public boolean reportRegionStateTransition(RegionStateTransitionContext context) { + return false; + } + @Override public boolean registerService(Service service) { // TODO Auto-generated method stub diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 3275d156de3..07d1e51cfcc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.junit.AfterClass; @@ -354,20 +355,20 @@ public class TestMetaTableAccessor { Table meta = MetaTableAccessor.getMetaHTable(connection); try { - MetaTableAccessor.updateRegionLocation(connection, primary, serverName0, seqNum0); + MetaTableAccessor.updateRegionLocation(connection, primary, serverName0, seqNum0, -1); // assert that the server, startcode and seqNum columns are there for the primary region assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true); // add replica = 1 - MetaTableAccessor.updateRegionLocation(connection, replica1, serverName1, seqNum1); + MetaTableAccessor.updateRegionLocation(connection, replica1, serverName1, seqNum1, -1); // check whether the primary is still there assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true); // now check for replica 1 assertMetaLocation(meta, primary.getRegionName(), serverName1, seqNum1, 1, true); // add replica = 1 - MetaTableAccessor.updateRegionLocation(connection, replica100, serverName100, seqNum100); + MetaTableAccessor.updateRegionLocation(connection, replica100, serverName100, seqNum100, -1); // check whether the primary is still there assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true); // check whether the replica 1 is still there @@ -525,5 +526,42 @@ public class TestMetaTableAccessor { verify(visitor, times(1)).visit((Result) anyObject()); table.close(); } + + @Test + public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException { + long regionId = System.currentTimeMillis(); + HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf("table_foo"), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0); + + ServerName sn = ServerName.valueOf("bar", 0, 0); + Table meta = MetaTableAccessor.getMetaHTable(connection); + try { + List