HBASE-13709 Updates to meta table server columns may be eclipsed

This commit is contained in:
Enis Soztutar 2015-05-20 17:42:32 -07:00
parent 3a1e101dcd
commit eddabdd353
16 changed files with 417 additions and 120 deletions

View File

@ -1563,7 +1563,7 @@ public class MetaTableAccessor {
Put put = new Put(regionInfo.getRegionName(), now);
addRegionInfo(put, regionInfo);
if (sn != null) {
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
addLocation(put, sn, openSeqNum, -1, regionInfo.getReplicaId());
}
putToMetaTable(connection, put);
LOG.info("Added daughter " + regionInfo.getEncodedName() +
@ -1600,7 +1600,7 @@ public class MetaTableAccessor {
Delete deleteB = makeDeleteFromRegionInfo(regionB);
// The merged is a new region, openSeqNum = 1 is fine.
addLocation(putOfMerged, sn, 1, mergedRegion.getReplicaId());
addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId());
// Add empty locations for region replicas of the merged region so that number of replicas can
// be cached whenever the primary region is looked up from meta
@ -1644,8 +1644,8 @@ public class MetaTableAccessor {
Put putA = makePutFromRegionInfo(splitA);
Put putB = makePutFromRegionInfo(splitB);
addLocation(putA, sn, 1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine.
addLocation(putB, sn, 1, splitB.getReplicaId());
addLocation(putA, sn, 1, -1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine.
addLocation(putB, sn, 1, -1, splitB.getReplicaId());
// Add empty locations for region replicas of daughters so that number of replicas can be
// cached whenever the primary region is looked up from meta
@ -1742,13 +1742,16 @@ public class MetaTableAccessor {
*
* @param connection connection we're using
* @param regionInfo region to update location of
* @param openSeqNum the latest sequence number obtained when the region was open
* @param sn Server name
* @param masterSystemTime wall clock time from master if passed in the open region RPC or -1
* @throws IOException
*/
public static void updateRegionLocation(Connection connection,
HRegionInfo regionInfo, ServerName sn, long updateSeqNum)
HRegionInfo regionInfo, ServerName sn, long openSeqNum,
long masterSystemTime)
throws IOException {
updateLocation(connection, regionInfo, sn, updateSeqNum);
updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime);
}
/**
@ -1761,16 +1764,21 @@ public class MetaTableAccessor {
* @param regionInfo region to update location of
* @param sn Server name
* @param openSeqNum the latest sequence number obtained when the region was open
* @param masterSystemTime wall clock time from master if passed in the open region RPC or -1
* @throws IOException In particular could throw {@link java.net.ConnectException}
* if the server is down on other end.
*/
private static void updateLocation(final Connection connection,
HRegionInfo regionInfo, ServerName sn, long openSeqNum)
HRegionInfo regionInfo, ServerName sn, long openSeqNum,
long masterSystemTime)
throws IOException {
// use the maximum of what master passed us vs local time.
long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
// region replicas are kept in the primary region's row
long time = EnvironmentEdgeManager.currentTime();
Put put = new Put(getMetaKeyForRegion(regionInfo), time);
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
addLocation(put, sn, openSeqNum, time, regionInfo.getReplicaId());
putToMetaTable(connection, put);
LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
" with server=" + sn);
@ -1886,15 +1894,16 @@ public class MetaTableAccessor {
return p;
}
public static Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId){
// using regionserver's local time as the timestamp of Put.
// See: HBASE-11536
long now = EnvironmentEdgeManager.currentTime();
p.addImmutable(getCatalogFamily(), getServerColumn(replicaId), now,
public static Put addLocation(final Put p, final ServerName sn, long openSeqNum,
long time, int replicaId){
if (time <= 0) {
time = EnvironmentEdgeManager.currentTime();
}
p.addImmutable(getCatalogFamily(), getServerColumn(replicaId), time,
Bytes.toBytes(sn.getHostAndPort()));
p.addImmutable(getCatalogFamily(), getStartCodeColumn(replicaId), now,
p.addImmutable(getCatalogFamily(), getStartCodeColumn(replicaId), time,
Bytes.toBytes(sn.getStartcode()));
p.addImmutable(getCatalogFamily(), getSeqNumColumn(replicaId), now,
p.addImmutable(getCatalogFamily(), getSeqNumColumn(replicaId), time,
Bytes.toBytes(openSeqNum));
return p;
}

View File

@ -106,6 +106,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionReq
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
@ -785,6 +786,8 @@ public final class RequestConverter {
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
// send the master's wall clock time as well, so that the RS can refer to it
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build();
}
@ -806,6 +809,7 @@ public final class RequestConverter {
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build();
}

View File

@ -3874,6 +3874,24 @@ public final class AdminProtos {
* </pre>
*/
long getServerStartCode();
// optional uint64 master_system_time = 5;
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
boolean hasMasterSystemTime();
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
long getMasterSystemTime();
}
/**
* Protobuf type {@code OpenRegionRequest}
@ -3939,6 +3957,11 @@ public final class AdminProtos {
serverStartCode_ = input.readUInt64();
break;
}
case 40: {
bitField0_ |= 0x00000002;
masterSystemTime_ = input.readUInt64();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -5195,9 +5218,34 @@ public final class AdminProtos {
return serverStartCode_;
}
// optional uint64 master_system_time = 5;
public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 5;
private long masterSystemTime_;
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
private void initFields() {
openInfo_ = java.util.Collections.emptyList();
serverStartCode_ = 0L;
masterSystemTime_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -5223,6 +5271,9 @@ public final class AdminProtos {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeUInt64(2, serverStartCode_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt64(5, masterSystemTime_);
}
getUnknownFields().writeTo(output);
}
@ -5240,6 +5291,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, serverStartCode_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(5, masterSystemTime_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -5270,6 +5325,11 @@ public final class AdminProtos {
result = result && (getServerStartCode()
== other.getServerStartCode());
}
result = result && (hasMasterSystemTime() == other.hasMasterSystemTime());
if (hasMasterSystemTime()) {
result = result && (getMasterSystemTime()
== other.getMasterSystemTime());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -5291,6 +5351,10 @@ public final class AdminProtos {
hash = (37 * hash) + SERVERSTARTCODE_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getServerStartCode());
}
if (hasMasterSystemTime()) {
hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getMasterSystemTime());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -5409,6 +5473,8 @@ public final class AdminProtos {
}
serverStartCode_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
masterSystemTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -5450,6 +5516,10 @@ public final class AdminProtos {
to_bitField0_ |= 0x00000001;
}
result.serverStartCode_ = serverStartCode_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000002;
}
result.masterSystemTime_ = masterSystemTime_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -5495,6 +5565,9 @@ public final class AdminProtos {
if (other.hasServerStartCode()) {
setServerStartCode(other.getServerStartCode());
}
if (other.hasMasterSystemTime()) {
setMasterSystemTime(other.getMasterSystemTime());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -5817,6 +5890,55 @@ public final class AdminProtos {
return this;
}
// optional uint64 master_system_time = 5;
private long masterSystemTime_ ;
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder setMasterSystemTime(long value) {
bitField0_ |= 0x00000004;
masterSystemTime_ = value;
onChanged();
return this;
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder clearMasterSystemTime() {
bitField0_ = (bitField0_ & ~0x00000004);
masterSystemTime_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:OpenRegionRequest)
}
@ -23249,87 +23371,88 @@ public final class AdminProtos {
"FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" +
"nlineRegionRequest\";\n\027GetOnlineRegionRes" +
"ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" +
"\374\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
"\230\002\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
"2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" +
"erverStartCode\030\002 \001(\004\032\227\001\n\016RegionOpenInfo\022" +
"\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_" +
"of_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003" +
" \003(\0132\013.ServerName\022#\n\033openForDistributedL",
"ogReplay\030\004 \001(\010\"\235\001\n\022OpenRegionResponse\022=\n" +
"\ropening_state\030\001 \003(\0162&.OpenRegionRespons" +
"e.RegionOpeningState\"H\n\022RegionOpeningSta" +
"te\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FA" +
"ILED_OPENING\020\002\"6\n\023WarmupRegionRequest\022\037\n" +
"\nregionInfo\030\001 \002(\0132\013.RegionInfo\"\026\n\024Warmup" +
"RegionResponse\"\271\001\n\022CloseRegionRequest\022 \n" +
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027versi" +
"on_of_closing_node\030\002 \001(\r\022\036\n\020transition_i" +
"n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004",
" \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" +
"(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(" +
"\010\"p\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
"\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " +
"\001(\004\022\036\n\026write_flush_wal_marker\030\003 \001(\010\"_\n\023F" +
"lushRegionResponse\022\027\n\017last_flush_time\030\001 " +
"\002(\004\022\017\n\007flushed\030\002 \001(\010\022\036\n\026wrote_flush_wal_" +
"marker\030\003 \001(\010\"K\n\022SplitRegionRequest\022 \n\006re" +
"gion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013split_po" +
"int\030\002 \001(\014\"\025\n\023SplitRegionResponse\"W\n\024Comp",
"actRegionRequest\022 \n\006region\030\001 \002(\0132\020.Regio" +
"nSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(" +
"\014\"\027\n\025CompactRegionResponse\"\262\001\n\031UpdateFav" +
"oredNodesRequest\022@\n\013update_info\030\001 \003(\0132+." +
"UpdateFavoredNodesRequest.RegionUpdateIn" +
"fo\032S\n\020RegionUpdateInfo\022\033\n\006region\030\001 \002(\0132\013" +
".RegionInfo\022\"\n\rfavored_nodes\030\002 \003(\0132\013.Ser" +
"verName\".\n\032UpdateFavoredNodesResponse\022\020\n" +
"\010response\030\001 \001(\r\"v\n\023MergeRegionsRequest\022\"" +
"\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010re",
"gion_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcib" +
"le\030\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"" +
"X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key" +
"_value_bytes\030\002 \003(\014\022\035\n\025associated_cell_co" +
"unt\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n" +
"\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALE" +
"ntryResponse\"\026\n\024RollWALWriterRequest\"0\n\025" +
"RollWALWriterResponse\022\027\n\017region_to_flush" +
"\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " +
"\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn",
"foRequest\"B\n\nServerInfo\022 \n\013server_name\030\001" +
" \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n" +
"\025GetServerInfoResponse\022 \n\013server_info\030\001 " +
"\002(\0132\013.ServerInfo\"\034\n\032UpdateConfigurationR" +
"equest\"\035\n\033UpdateConfigurationResponse2\325\010" +
"\n\014AdminService\022>\n\rGetRegionInfo\022\025.GetReg" +
"ionInfoRequest\032\026.GetRegionInfoResponse\022;" +
"\n\014GetStoreFile\022\024.GetStoreFileRequest\032\025.G" +
"etStoreFileResponse\022D\n\017GetOnlineRegion\022\027" +
".GetOnlineRegionRequest\032\030.GetOnlineRegio",
"nResponse\0225\n\nOpenRegion\022\022.OpenRegionRequ" +
"est\032\023.OpenRegionResponse\022;\n\014WarmupRegion" +
"\022\024.WarmupRegionRequest\032\025.WarmupRegionRes" +
"ponse\0228\n\013CloseRegion\022\023.CloseRegionReques" +
"t\032\024.CloseRegionResponse\0228\n\013FlushRegion\022\023" +
".FlushRegionRequest\032\024.FlushRegionRespons" +
"e\0228\n\013SplitRegion\022\023.SplitRegionRequest\032\024." +
"SplitRegionResponse\022>\n\rCompactRegion\022\025.C" +
"ompactRegionRequest\032\026.CompactRegionRespo" +
"nse\022;\n\014MergeRegions\022\024.MergeRegionsReques",
"t\032\025.MergeRegionsResponse\022J\n\021ReplicateWAL" +
"Entry\022\031.ReplicateWALEntryRequest\032\032.Repli" +
"cateWALEntryResponse\022?\n\006Replay\022\031.Replica" +
"teWALEntryRequest\032\032.ReplicateWALEntryRes" +
"ponse\022>\n\rRollWALWriter\022\025.RollWALWriterRe" +
"quest\032\026.RollWALWriterResponse\022>\n\rGetServ" +
"erInfo\022\025.GetServerInfoRequest\032\026.GetServe" +
"rInfoResponse\0225\n\nStopServer\022\022.StopServer" +
"Request\032\023.StopServerResponse\022M\n\022UpdateFa" +
"voredNodes\022\032.UpdateFavoredNodesRequest\032\033",
".UpdateFavoredNodesResponse\022P\n\023UpdateCon" +
"figuration\022\033.UpdateConfigurationRequest\032" +
"\034.UpdateConfigurationResponseBA\n*org.apa" +
"che.hadoop.hbase.protobuf.generatedB\013Adm" +
"inProtosH\001\210\001\001\240\001\001"
"erverStartCode\030\002 \001(\004\022\032\n\022master_system_ti" +
"me\030\005 \001(\004\032\227\001\n\016RegionOpenInfo\022\033\n\006region\030\001 " +
"\002(\0132\013.RegionInfo\022\037\n\027version_of_offline_n" +
"ode\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 \003(\0132\013.Serve",
"rName\022#\n\033openForDistributedLogReplay\030\004 \001" +
"(\010\"\235\001\n\022OpenRegionResponse\022=\n\ropening_sta" +
"te\030\001 \003(\0162&.OpenRegionResponse.RegionOpen" +
"ingState\"H\n\022RegionOpeningState\022\n\n\006OPENED" +
"\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FAILED_OPENING" +
"\020\002\"6\n\023WarmupRegionRequest\022\037\n\nregionInfo\030" +
"\001 \002(\0132\013.RegionInfo\"\026\n\024WarmupRegionRespon" +
"se\"\271\001\n\022CloseRegionRequest\022 \n\006region\030\001 \002(" +
"\0132\020.RegionSpecifier\022\037\n\027version_of_closin" +
"g_node\030\002 \001(\r\022\036\n\020transition_in_ZK\030\003 \001(\010:\004",
"true\022\'\n\022destination_server\030\004 \001(\0132\013.Serve" +
"rName\022\027\n\017serverStartCode\030\005 \001(\004\"%\n\023CloseR" +
"egionResponse\022\016\n\006closed\030\001 \002(\010\"p\n\022FlushRe" +
"gionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec" +
"ifier\022\030\n\020if_older_than_ts\030\002 \001(\004\022\036\n\026write" +
"_flush_wal_marker\030\003 \001(\010\"_\n\023FlushRegionRe" +
"sponse\022\027\n\017last_flush_time\030\001 \002(\004\022\017\n\007flush" +
"ed\030\002 \001(\010\022\036\n\026wrote_flush_wal_marker\030\003 \001(\010" +
"\"K\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020" +
".RegionSpecifier\022\023\n\013split_point\030\002 \001(\014\"\025\n",
"\023SplitRegionResponse\"W\n\024CompactRegionReq" +
"uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r" +
"\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025Compact" +
"RegionResponse\"\262\001\n\031UpdateFavoredNodesReq" +
"uest\022@\n\013update_info\030\001 \003(\0132+.UpdateFavore" +
"dNodesRequest.RegionUpdateInfo\032S\n\020Region" +
"UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" +
"\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" +
"pdateFavoredNodesResponse\022\020\n\010response\030\001 " +
"\001(\r\"v\n\023MergeRegionsRequest\022\"\n\010region_a\030\001",
" \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(\013" +
"2\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005fa" +
"lse\"\026\n\024MergeRegionsResponse\"X\n\010WALEntry\022" +
"\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_bytes" +
"\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001(\005\"4\n" +
"\030ReplicateWALEntryRequest\022\030\n\005entry\030\001 \003(\013" +
"2\t.WALEntry\"\033\n\031ReplicateWALEntryResponse" +
"\"\026\n\024RollWALWriterRequest\"0\n\025RollWALWrite" +
"rResponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021St" +
"opServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopS",
"erverResponse\"\026\n\024GetServerInfoRequest\"B\n" +
"\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013.Serve" +
"rName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetServerIn" +
"foResponse\022 \n\013server_info\030\001 \002(\0132\013.Server" +
"Info\"\034\n\032UpdateConfigurationRequest\"\035\n\033Up" +
"dateConfigurationResponse2\325\010\n\014AdminServi" +
"ce\022>\n\rGetRegionInfo\022\025.GetRegionInfoReque" +
"st\032\026.GetRegionInfoResponse\022;\n\014GetStoreFi" +
"le\022\024.GetStoreFileRequest\032\025.GetStoreFileR" +
"esponse\022D\n\017GetOnlineRegion\022\027.GetOnlineRe",
"gionRequest\032\030.GetOnlineRegionResponse\0225\n" +
"\nOpenRegion\022\022.OpenRegionRequest\032\023.OpenRe" +
"gionResponse\022;\n\014WarmupRegion\022\024.WarmupReg" +
"ionRequest\032\025.WarmupRegionResponse\0228\n\013Clo" +
"seRegion\022\023.CloseRegionRequest\032\024.CloseReg" +
"ionResponse\0228\n\013FlushRegion\022\023.FlushRegion" +
"Request\032\024.FlushRegionResponse\0228\n\013SplitRe" +
"gion\022\023.SplitRegionRequest\032\024.SplitRegionR" +
"esponse\022>\n\rCompactRegion\022\025.CompactRegion" +
"Request\032\026.CompactRegionResponse\022;\n\014Merge",
"Regions\022\024.MergeRegionsRequest\032\025.MergeReg" +
"ionsResponse\022J\n\021ReplicateWALEntry\022\031.Repl" +
"icateWALEntryRequest\032\032.ReplicateWALEntry" +
"Response\022?\n\006Replay\022\031.ReplicateWALEntryRe" +
"quest\032\032.ReplicateWALEntryResponse\022>\n\rRol" +
"lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" +
"WALWriterResponse\022>\n\rGetServerInfo\022\025.Get" +
"ServerInfoRequest\032\026.GetServerInfoRespons" +
"e\0225\n\nStopServer\022\022.StopServerRequest\032\023.St" +
"opServerResponse\022M\n\022UpdateFavoredNodes\022\032",
".UpdateFavoredNodesRequest\032\033.UpdateFavor" +
"edNodesResponse\022P\n\023UpdateConfiguration\022\033" +
".UpdateConfigurationRequest\032\034.UpdateConf" +
"igurationResponseBA\n*org.apache.hadoop.h" +
"base.protobuf.generatedB\013AdminProtosH\001\210\001" +
"\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -23377,7 +23500,7 @@ public final class AdminProtos {
internal_static_OpenRegionRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_OpenRegionRequest_descriptor,
new java.lang.String[] { "OpenInfo", "ServerStartCode", });
new java.lang.String[] { "OpenInfo", "ServerStartCode", "MasterSystemTime", });
internal_static_OpenRegionRequest_RegionOpenInfo_descriptor =
internal_static_OpenRegionRequest_descriptor.getNestedTypes().get(0);
internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new

View File

@ -70,6 +70,8 @@ message OpenRegionRequest {
repeated RegionOpenInfo open_info = 1;
// the intended server for this RPC.
optional uint64 serverStartCode = 2;
// wall clock time from master
optional uint64 master_system_time = 5;
message RegionOpenInfo {
required RegionInfo region = 1;

View File

@ -198,7 +198,7 @@ public class RegionStateStore {
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN
&& serverName != null, "Open region should be on a server");
MetaTableAccessor.addLocation(put, serverName, openSeqNum, replicaId);
MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
info.append("&openSeqNum=").append(openSeqNum);
info.append("&server=").append(serverName);
}

View File

@ -1875,6 +1875,14 @@ public class HRegionServer extends HasThread implements
@Override
public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
postOpenDeployTasks(new PostOpenDeployContext(r, -1));
}
@Override
public void postOpenDeployTasks(final PostOpenDeployContext context)
throws KeeperException, IOException {
Region r = context.getRegion();
long masterSystemTime = context.getMasterSystemTime();
Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
rpcServices.checkOpen();
LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
@ -1896,8 +1904,8 @@ public class HRegionServer extends HasThread implements
updateRecoveringRegionLastFlushedSequenceId(r);
// Notify master
if (!reportRegionStateTransition(
TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
if (!reportRegionStateTransition(new RegionStateTransitionContext(
TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
throw new IOException("Failed to report opened region to master: "
+ r.getRegionInfo().getRegionNameAsString());
}
@ -1915,6 +1923,17 @@ public class HRegionServer extends HasThread implements
@Override
public boolean reportRegionStateTransition(
TransitionCode code, long openSeqNum, HRegionInfo... hris) {
return reportRegionStateTransition(
new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
}
@Override
public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
TransitionCode code = context.getCode();
long openSeqNum = context.getOpenSeqNum();
long masterSystemTime = context.getMasterSystemTime();
HRegionInfo[] hris = context.getHris();
if (TEST_SKIP_REPORTING_TRANSITION) {
// This is for testing only in case there is no master
// to handle the region transition report at all.
@ -1931,7 +1950,7 @@ public class HRegionServer extends HasThread implements
} else {
try {
MetaTableAccessor.updateRegionLocation(clusterConnection,
hris[0], serverName, openSeqNum);
hris[0], serverName, openSeqNum, masterSystemTime);
} catch (IOException e) {
LOG.info("Failed to update meta", e);
return false;

View File

@ -1405,6 +1405,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
}
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
HTableDescriptor htd;
@ -1473,12 +1476,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Need to pass the expected version in the constructor.
if (region.isMetaRegion()) {
regionServer.service.submit(new OpenMetaHandler(
regionServer, regionServer, region, htd));
regionServer, regionServer, region, htd, masterSystemTime));
} else {
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
regionServer.service.submit(new OpenRegionHandler(
regionServer, regionServer, region, htd));
regionServer, regionServer, region, htd, masterSystemTime));
}
}

View File

@ -79,6 +79,36 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
*/
RegionServerQuotaManager getRegionServerQuotaManager();
/**
* Context for postOpenDeployTasks().
*/
class PostOpenDeployContext {
private final Region region;
private final long masterSystemTime;
@InterfaceAudience.Private
public PostOpenDeployContext(Region region, long masterSystemTime) {
this.region = region;
this.masterSystemTime = masterSystemTime;
}
public Region getRegion() {
return region;
}
public long getMasterSystemTime() {
return masterSystemTime;
}
}
/**
* Tasks to perform after region open to complete deploy of region on
* regionserver
*
* @param context the context
* @throws KeeperException
* @throws IOException
*/
void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException;
/**
* Tasks to perform after region open to complete deploy of region on
* regionserver
@ -86,17 +116,56 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
* @param r Region to open.
* @throws KeeperException
* @throws IOException
* @deprecated use {@link #postOpenDeployTasks(PostOpenDeployContext)}
*/
@Deprecated
void postOpenDeployTasks(final Region r) throws KeeperException, IOException;
class RegionStateTransitionContext {
private final TransitionCode code;
private final long openSeqNum;
private final long masterSystemTime;
private final HRegionInfo[] hris;
@InterfaceAudience.Private
public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long masterSystemTime,
HRegionInfo... hris) {
this.code = code;
this.openSeqNum = openSeqNum;
this.masterSystemTime = masterSystemTime;
this.hris = hris;
}
public TransitionCode getCode() {
return code;
}
public long getOpenSeqNum() {
return openSeqNum;
}
public long getMasterSystemTime() {
return masterSystemTime;
}
public HRegionInfo[] getHris() {
return hris;
}
}
/**
* Notify master that a handler requests to change a region state
*/
boolean reportRegionStateTransition(final RegionStateTransitionContext context);
/**
* Notify master that a handler requests to change a region state
* @deprecated use {@link #reportRegionStateTransition(RegionStateTransitionContext)}
*/
@Deprecated
boolean reportRegionStateTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris);
/**
* Notify master that a handler requests to change a region state
* @deprecated use {@link #reportRegionStateTransition(RegionStateTransitionContext)}
*/
@Deprecated
boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris);
/**

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
public class OpenMetaHandler extends OpenRegionHandler {
public OpenMetaHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd) {
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META);
final HTableDescriptor htd, long masterSystemTime) {
super(server, rsServices, regionInfo, htd, masterSystemTime, EventType.M_RS_OPEN_META);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.util.CancelableProgressable;
/**
* Handles opening of a region on a region server.
@ -47,20 +48,22 @@ public class OpenRegionHandler extends EventHandler {
private final HRegionInfo regionInfo;
private final HTableDescriptor htd;
private final long masterSystemTime;
public OpenRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
HTableDescriptor htd) {
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION);
HTableDescriptor htd, long masterSystemTime) {
this(server, rsServices, regionInfo, htd, masterSystemTime, EventType.M_RS_OPEN_REGION);
}
protected OpenRegionHandler(final Server server,
final RegionServerServices rsServices, final HRegionInfo regionInfo,
final HTableDescriptor htd, EventType eventType) {
final HTableDescriptor htd, long masterSystemTime, EventType eventType) {
super(server, eventType);
this.rsServices = rsServices;
this.regionInfo = regionInfo;
this.htd = htd;
this.masterSystemTime = masterSystemTime;
}
public HRegionInfo getRegionInfo() {
@ -105,7 +108,7 @@ public class OpenRegionHandler extends EventHandler {
return;
}
if (!updateMeta(region) || this.server.isStopped() ||
if (!updateMeta(region, masterSystemTime) || this.server.isStopped() ||
this.rsServices.isStopping()) {
return;
}
@ -167,7 +170,7 @@ public class OpenRegionHandler extends EventHandler {
* state meantime so master doesn't timeout our region-in-transition.
* Caller must cleanup region if this fails.
*/
boolean updateMeta(final HRegion r) {
boolean updateMeta(final HRegion r, long masterSystemTime) {
if (this.server.isStopped() || this.rsServices.isStopping()) {
return false;
}
@ -175,7 +178,7 @@ public class OpenRegionHandler extends EventHandler {
// Else, wait.
final AtomicBoolean signaller = new AtomicBoolean(false);
PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
this.server, this.rsServices, signaller);
this.server, this.rsServices, signaller, masterSystemTime);
t.start();
// Post open deploy task:
// meta => update meta location in ZK
@ -225,20 +228,23 @@ public class OpenRegionHandler extends EventHandler {
private final RegionServerServices services;
private final HRegion region;
private final AtomicBoolean signaller;
private final long masterSystemTime;
PostOpenDeployTasksThread(final HRegion region, final Server server,
final RegionServerServices services, final AtomicBoolean signaller) {
final RegionServerServices services, final AtomicBoolean signaller, long masterSystemTime) {
super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
this.setDaemon(true);
this.server = server;
this.services = services;
this.region = region;
this.signaller = signaller;
this.masterSystemTime = masterSystemTime;
}
@Override
public void run() {
try {
this.services.postOpenDeployTasks(this.region);
this.services.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
} catch (Throwable e) {
String msg = "Exception running postOpenDeployTasks; region=" +
this.region.getRegionInfo().getEncodedName();
@ -278,6 +284,7 @@ public class OpenRegionHandler extends EventHandler {
this.server.getConfiguration(),
this.rsServices,
new CancelableProgressable() {
@Override
public boolean progress() {
if (!isRegionStillOpening()) {
LOG.warn("Open region aborted since it isn't opening any more");

View File

@ -149,7 +149,7 @@ public class HBaseFsckRepair {
* (default 120s) to close the region. This bypasses the active hmaster.
*/
@SuppressWarnings("deprecation")
public static void closeRegionSilentlyAndWait(HConnection connection,
public static void closeRegionSilentlyAndWait(HConnection connection,
ServerName server, HRegionInfo region) throws IOException, InterruptedException {
long timeout = connection.getConfiguration()
.getLong("hbase.hbck.close.timeout", 120000);
@ -174,7 +174,7 @@ public class HBaseFsckRepair {
// see the additional replicas when it is asked to assign. The
// final value of these columns will be different and will be updated
// by the actual regionservers that start hosting the respective replicas
MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), i);
MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), -1, i);
}
}
meta.put(put);

View File

@ -119,6 +119,12 @@ public class MockRegionServerServices implements RegionServerServices {
addToOnlineRegions(r);
}
@Override
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
IOException {
addToOnlineRegions(context.getRegion());
}
@Override
public boolean isStopping() {
return this.stopping;
@ -280,6 +286,11 @@ public class MockRegionServerServices implements RegionServerServices {
return false;
}
@Override
public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
return false;
}
@Override
public boolean registerService(Service service) {
// TODO Auto-generated method stub

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.AfterClass;
@ -354,20 +355,20 @@ public class TestMetaTableAccessor {
Table meta = MetaTableAccessor.getMetaHTable(connection);
try {
MetaTableAccessor.updateRegionLocation(connection, primary, serverName0, seqNum0);
MetaTableAccessor.updateRegionLocation(connection, primary, serverName0, seqNum0, -1);
// assert that the server, startcode and seqNum columns are there for the primary region
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
// add replica = 1
MetaTableAccessor.updateRegionLocation(connection, replica1, serverName1, seqNum1);
MetaTableAccessor.updateRegionLocation(connection, replica1, serverName1, seqNum1, -1);
// check whether the primary is still there
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
// now check for replica 1
assertMetaLocation(meta, primary.getRegionName(), serverName1, seqNum1, 1, true);
// add replica = 1
MetaTableAccessor.updateRegionLocation(connection, replica100, serverName100, seqNum100);
MetaTableAccessor.updateRegionLocation(connection, replica100, serverName100, seqNum100, -1);
// check whether the primary is still there
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
// check whether the replica 1 is still there
@ -525,5 +526,42 @@ public class TestMetaTableAccessor {
verify(visitor, times(1)).visit((Result) anyObject());
table.close();
}
@Test
public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException {
long regionId = System.currentTimeMillis();
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf("table_foo"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0);
ServerName sn = ServerName.valueOf("bar", 0, 0);
Table meta = MetaTableAccessor.getMetaHTable(connection);
try {
List<HRegionInfo> regionInfos = Lists.newArrayList(regionInfo);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
long masterSystemTime = EnvironmentEdgeManager.currentTime() + 123456789;
MetaTableAccessor.updateRegionLocation(connection, regionInfo, sn, 1, masterSystemTime);
Get get = new Get(regionInfo.getRegionName());
Result result = meta.get(get);
Cell serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getServerColumn(0));
Cell startCodeCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getStartCodeColumn(0));
Cell seqNumCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getSeqNumColumn(0));
assertNotNull(serverCell);
assertNotNull(startCodeCell);
assertNotNull(seqNumCell);
assertTrue(serverCell.getValueLength() > 0);
assertTrue(startCodeCell.getValueLength() > 0);
assertTrue(seqNumCell.getValueLength() > 0);
assertEquals(masterSystemTime, serverCell.getTimestamp());
assertEquals(masterSystemTime, startCodeCell.getTimestamp());
assertEquals(masterSystemTime, seqNumCell.getTimestamp());
} finally {
meta.close();
}
}
}

View File

@ -330,6 +330,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return new NullTableLockManager();
}
@Override
public RegionServerQuotaManager getRegionServerQuotaManager() {
return null;
}
@ -339,6 +340,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
// TODO Auto-generated method stub
}
@Override
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
IOException {
// TODO Auto-generated method stub
}
@Override
public RpcServerInterface getRpcServer() {
// TODO Auto-generated method stub
@ -610,6 +617,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return false;
}
@Override
public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
return false;
}
@Override
public boolean registerService(Service service) {
// TODO Auto-generated method stub

View File

@ -259,7 +259,7 @@ public class TestRegionServerNoMaster {
// Let's start the open handler
HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd));
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, -1));
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed(HTU, getRS(), hri);

View File

@ -757,7 +757,7 @@ public class TestHBaseFsck {
Collection<ServerName> var = admin.getClusterStatus().getServers();
ServerName sn = var.toArray(new ServerName[var.size()])[0];
//add a location with replicaId as 2 (since we already have replicas with replicaid 0 and 1)
MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), 2);
MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), -1, 2);
meta.put(put);
// assign the new replica
HBaseFsckRepair.fixUnassigned(admin, newHri);