HBASE-13709 Updates to meta table server columns may be eclipsed

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
This commit is contained in:
Enis Soztutar 2015-05-20 17:42:32 -07:00
parent c9f052be66
commit 420baa42ce
19 changed files with 430 additions and 133 deletions

View File

@ -1183,7 +1183,7 @@ public class MetaTableAccessor {
Put put = new Put(regionInfo.getRegionName()); Put put = new Put(regionInfo.getRegionName());
addRegionInfo(put, regionInfo); addRegionInfo(put, regionInfo);
if (sn != null) { if (sn != null) {
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId()); addLocation(put, sn, openSeqNum, -1, regionInfo.getReplicaId());
} }
putToMetaTable(connection, put); putToMetaTable(connection, put);
LOG.info("Added daughter " + regionInfo.getEncodedName() + LOG.info("Added daughter " + regionInfo.getEncodedName() +
@ -1220,7 +1220,7 @@ public class MetaTableAccessor {
Delete deleteB = makeDeleteFromRegionInfo(regionB); Delete deleteB = makeDeleteFromRegionInfo(regionB);
// The merged is a new region, openSeqNum = 1 is fine. // The merged is a new region, openSeqNum = 1 is fine.
addLocation(putOfMerged, sn, 1, mergedRegion.getReplicaId()); addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId());
// Add empty locations for region replicas of the merged region so that number of replicas can // Add empty locations for region replicas of the merged region so that number of replicas can
// be cached whenever the primary region is looked up from meta // be cached whenever the primary region is looked up from meta
@ -1264,8 +1264,8 @@ public class MetaTableAccessor {
Put putA = makePutFromRegionInfo(splitA); Put putA = makePutFromRegionInfo(splitA);
Put putB = makePutFromRegionInfo(splitB); Put putB = makePutFromRegionInfo(splitB);
addLocation(putA, sn, 1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine. addLocation(putA, sn, 1, -1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine.
addLocation(putB, sn, 1, splitB.getReplicaId()); addLocation(putB, sn, 1, -1, splitB.getReplicaId());
// Add empty locations for region replicas of daughters so that number of replicas can be // Add empty locations for region replicas of daughters so that number of replicas can be
// cached whenever the primary region is looked up from meta // cached whenever the primary region is looked up from meta
@ -1320,13 +1320,16 @@ public class MetaTableAccessor {
* *
* @param connection connection we're using * @param connection connection we're using
* @param regionInfo region to update location of * @param regionInfo region to update location of
* @param openSeqNum the latest sequence number obtained when the region was open
* @param sn Server name * @param sn Server name
* @param masterSystemTime wall clock time from master if passed in the open region RPC or -1
* @throws IOException * @throws IOException
*/ */
public static void updateRegionLocation(Connection connection, public static void updateRegionLocation(Connection connection,
HRegionInfo regionInfo, ServerName sn, long updateSeqNum) HRegionInfo regionInfo, ServerName sn, long openSeqNum,
long masterSystemTime)
throws IOException { throws IOException {
updateLocation(connection, regionInfo, sn, updateSeqNum); updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime);
} }
/** /**
@ -1339,15 +1342,21 @@ public class MetaTableAccessor {
* @param regionInfo region to update location of * @param regionInfo region to update location of
* @param sn Server name * @param sn Server name
* @param openSeqNum the latest sequence number obtained when the region was open * @param openSeqNum the latest sequence number obtained when the region was open
* @param masterSystemTime wall clock time from master if passed in the open region RPC or -1
* @throws IOException In particular could throw {@link java.net.ConnectException} * @throws IOException In particular could throw {@link java.net.ConnectException}
* if the server is down on other end. * if the server is down on other end.
*/ */
private static void updateLocation(final Connection connection, private static void updateLocation(final Connection connection,
HRegionInfo regionInfo, ServerName sn, long openSeqNum) HRegionInfo regionInfo, ServerName sn, long openSeqNum,
long masterSystemTime)
throws IOException { throws IOException {
// use the maximum of what master passed us vs local time.
long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
// region replicas are kept in the primary region's row // region replicas are kept in the primary region's row
Put put = new Put(getMetaKeyForRegion(regionInfo)); Put put = new Put(getMetaKeyForRegion(regionInfo), time);
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId()); addLocation(put, sn, openSeqNum, time, regionInfo.getReplicaId());
putToMetaTable(connection, put); putToMetaTable(connection, put);
LOG.info("Updated row " + regionInfo.getRegionNameAsString() + LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
" with server=" + sn); " with server=" + sn);
@ -1457,15 +1466,16 @@ public class MetaTableAccessor {
return p; return p;
} }
public static Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId){ public static Put addLocation(final Put p, final ServerName sn, long openSeqNum,
// using regionserver's local time as the timestamp of Put. long time, int replicaId){
// See: HBASE-11536 if (time <= 0) {
long now = EnvironmentEdgeManager.currentTime(); time = EnvironmentEdgeManager.currentTime();
p.addImmutable(HConstants.CATALOG_FAMILY, getServerColumn(replicaId), now, }
p.addImmutable(HConstants.CATALOG_FAMILY, getServerColumn(replicaId), time,
Bytes.toBytes(sn.getHostAndPort())); Bytes.toBytes(sn.getHostAndPort()));
p.addImmutable(HConstants.CATALOG_FAMILY, getStartCodeColumn(replicaId), now, p.addImmutable(HConstants.CATALOG_FAMILY, getStartCodeColumn(replicaId), time,
Bytes.toBytes(sn.getStartcode())); Bytes.toBytes(sn.getStartcode()));
p.addImmutable(HConstants.CATALOG_FAMILY, getSeqNumColumn(replicaId), now, p.addImmutable(HConstants.CATALOG_FAMILY, getSeqNumColumn(replicaId), time,
Bytes.toBytes(openSeqNum)); Bytes.toBytes(openSeqNum));
return p; return p;
} }

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionReq
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.util.Triple;
@ -789,6 +790,8 @@ public final class RequestConverter {
if (server != null) { if (server != null) {
builder.setServerStartCode(server.getStartcode()); builder.setServerStartCode(server.getStartcode());
} }
// send the master's wall clock time as well, so that the RS can refer to it
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build(); return builder.build();
} }
@ -811,6 +814,7 @@ public final class RequestConverter {
if (server != null) { if (server != null) {
builder.setServerStartCode(server.getStartcode()); builder.setServerStartCode(server.getStartcode());
} }
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build(); return builder.build();
} }

View File

@ -3874,6 +3874,24 @@ public final class AdminProtos {
* </pre> * </pre>
*/ */
long getServerStartCode(); long getServerStartCode();
// optional uint64 master_system_time = 5;
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
boolean hasMasterSystemTime();
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
long getMasterSystemTime();
} }
/** /**
* Protobuf type {@code OpenRegionRequest} * Protobuf type {@code OpenRegionRequest}
@ -3939,6 +3957,11 @@ public final class AdminProtos {
serverStartCode_ = input.readUInt64(); serverStartCode_ = input.readUInt64();
break; break;
} }
case 40: {
bitField0_ |= 0x00000002;
masterSystemTime_ = input.readUInt64();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -5195,9 +5218,34 @@ public final class AdminProtos {
return serverStartCode_; return serverStartCode_;
} }
// optional uint64 master_system_time = 5;
public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 5;
private long masterSystemTime_;
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
private void initFields() { private void initFields() {
openInfo_ = java.util.Collections.emptyList(); openInfo_ = java.util.Collections.emptyList();
serverStartCode_ = 0L; serverStartCode_ = 0L;
masterSystemTime_ = 0L;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -5223,6 +5271,9 @@ public final class AdminProtos {
if (((bitField0_ & 0x00000001) == 0x00000001)) { if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeUInt64(2, serverStartCode_); output.writeUInt64(2, serverStartCode_);
} }
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt64(5, masterSystemTime_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -5240,6 +5291,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, serverStartCode_); .computeUInt64Size(2, serverStartCode_);
} }
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(5, masterSystemTime_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -5270,6 +5325,11 @@ public final class AdminProtos {
result = result && (getServerStartCode() result = result && (getServerStartCode()
== other.getServerStartCode()); == other.getServerStartCode());
} }
result = result && (hasMasterSystemTime() == other.hasMasterSystemTime());
if (hasMasterSystemTime()) {
result = result && (getMasterSystemTime()
== other.getMasterSystemTime());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -5291,6 +5351,10 @@ public final class AdminProtos {
hash = (37 * hash) + SERVERSTARTCODE_FIELD_NUMBER; hash = (37 * hash) + SERVERSTARTCODE_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getServerStartCode()); hash = (53 * hash) + hashLong(getServerStartCode());
} }
if (hasMasterSystemTime()) {
hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getMasterSystemTime());
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -5409,6 +5473,8 @@ public final class AdminProtos {
} }
serverStartCode_ = 0L; serverStartCode_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002); bitField0_ = (bitField0_ & ~0x00000002);
masterSystemTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000004);
return this; return this;
} }
@ -5450,6 +5516,10 @@ public final class AdminProtos {
to_bitField0_ |= 0x00000001; to_bitField0_ |= 0x00000001;
} }
result.serverStartCode_ = serverStartCode_; result.serverStartCode_ = serverStartCode_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000002;
}
result.masterSystemTime_ = masterSystemTime_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -5495,6 +5565,9 @@ public final class AdminProtos {
if (other.hasServerStartCode()) { if (other.hasServerStartCode()) {
setServerStartCode(other.getServerStartCode()); setServerStartCode(other.getServerStartCode());
} }
if (other.hasMasterSystemTime()) {
setMasterSystemTime(other.getMasterSystemTime());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -5817,6 +5890,55 @@ public final class AdminProtos {
return this; return this;
} }
// optional uint64 master_system_time = 5;
private long masterSystemTime_ ;
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder setMasterSystemTime(long value) {
bitField0_ |= 0x00000004;
masterSystemTime_ = value;
onChanged();
return this;
}
/**
* <code>optional uint64 master_system_time = 5;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder clearMasterSystemTime() {
bitField0_ = (bitField0_ & ~0x00000004);
masterSystemTime_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:OpenRegionRequest) // @@protoc_insertion_point(builder_scope:OpenRegionRequest)
} }
@ -23249,87 +23371,88 @@ public final class AdminProtos {
"FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" + "FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" +
"nlineRegionRequest\";\n\027GetOnlineRegionRes" + "nlineRegionRequest\";\n\027GetOnlineRegionRes" +
"ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" + "ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" +
"\374\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" + "\230\002\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
"2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" + "2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" +
"erverStartCode\030\002 \001(\004\032\227\001\n\016RegionOpenInfo\022" + "erverStartCode\030\002 \001(\004\022\032\n\022master_system_ti" +
"\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_" + "me\030\005 \001(\004\032\227\001\n\016RegionOpenInfo\022\033\n\006region\030\001 " +
"of_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003" + "\002(\0132\013.RegionInfo\022\037\n\027version_of_offline_n" +
" \003(\0132\013.ServerName\022#\n\033openForDistributedL", "ode\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 \003(\0132\013.Serve",
"ogReplay\030\004 \001(\010\"\235\001\n\022OpenRegionResponse\022=\n" + "rName\022#\n\033openForDistributedLogReplay\030\004 \001" +
"\ropening_state\030\001 \003(\0162&.OpenRegionRespons" + "(\010\"\235\001\n\022OpenRegionResponse\022=\n\ropening_sta" +
"e.RegionOpeningState\"H\n\022RegionOpeningSta" + "te\030\001 \003(\0162&.OpenRegionResponse.RegionOpen" +
"te\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FA" + "ingState\"H\n\022RegionOpeningState\022\n\n\006OPENED" +
"ILED_OPENING\020\002\"6\n\023WarmupRegionRequest\022\037\n" + "\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FAILED_OPENING" +
"\nregionInfo\030\001 \002(\0132\013.RegionInfo\"\026\n\024Warmup" + "\020\002\"6\n\023WarmupRegionRequest\022\037\n\nregionInfo\030" +
"RegionResponse\"\271\001\n\022CloseRegionRequest\022 \n" + "\001 \002(\0132\013.RegionInfo\"\026\n\024WarmupRegionRespon" +
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027versi" + "se\"\271\001\n\022CloseRegionRequest\022 \n\006region\030\001 \002(" +
"on_of_closing_node\030\002 \001(\r\022\036\n\020transition_i" + "\0132\020.RegionSpecifier\022\037\n\027version_of_closin" +
"n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004", "g_node\030\002 \001(\r\022\036\n\020transition_in_ZK\030\003 \001(\010:\004",
" \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" + "true\022\'\n\022destination_server\030\004 \001(\0132\013.Serve" +
"(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(" + "rName\022\027\n\017serverStartCode\030\005 \001(\004\"%\n\023CloseR" +
"\010\"p\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" + "egionResponse\022\016\n\006closed\030\001 \002(\010\"p\n\022FlushRe" +
"\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " + "gionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec" +
"\001(\004\022\036\n\026write_flush_wal_marker\030\003 \001(\010\"_\n\023F" + "ifier\022\030\n\020if_older_than_ts\030\002 \001(\004\022\036\n\026write" +
"lushRegionResponse\022\027\n\017last_flush_time\030\001 " + "_flush_wal_marker\030\003 \001(\010\"_\n\023FlushRegionRe" +
"\002(\004\022\017\n\007flushed\030\002 \001(\010\022\036\n\026wrote_flush_wal_" + "sponse\022\027\n\017last_flush_time\030\001 \002(\004\022\017\n\007flush" +
"marker\030\003 \001(\010\"K\n\022SplitRegionRequest\022 \n\006re" + "ed\030\002 \001(\010\022\036\n\026wrote_flush_wal_marker\030\003 \001(\010" +
"gion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013split_po" + "\"K\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020" +
"int\030\002 \001(\014\"\025\n\023SplitRegionResponse\"W\n\024Comp", ".RegionSpecifier\022\023\n\013split_point\030\002 \001(\014\"\025\n",
"actRegionRequest\022 \n\006region\030\001 \002(\0132\020.Regio" + "\023SplitRegionResponse\"W\n\024CompactRegionReq" +
"nSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(" + "uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r" +
"\014\"\027\n\025CompactRegionResponse\"\262\001\n\031UpdateFav" + "\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025Compact" +
"oredNodesRequest\022@\n\013update_info\030\001 \003(\0132+." + "RegionResponse\"\262\001\n\031UpdateFavoredNodesReq" +
"UpdateFavoredNodesRequest.RegionUpdateIn" + "uest\022@\n\013update_info\030\001 \003(\0132+.UpdateFavore" +
"fo\032S\n\020RegionUpdateInfo\022\033\n\006region\030\001 \002(\0132\013" + "dNodesRequest.RegionUpdateInfo\032S\n\020Region" +
".RegionInfo\022\"\n\rfavored_nodes\030\002 \003(\0132\013.Ser" + "UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" +
"verName\".\n\032UpdateFavoredNodesResponse\022\020\n" + "\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" +
"\010response\030\001 \001(\r\"v\n\023MergeRegionsRequest\022\"" + "pdateFavoredNodesResponse\022\020\n\010response\030\001 " +
"\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010re", "\001(\r\"v\n\023MergeRegionsRequest\022\"\n\010region_a\030\001",
"gion_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcib" + " \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(\013" +
"le\030\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"" + "2\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005fa" +
"X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key" + "lse\"\026\n\024MergeRegionsResponse\"X\n\010WALEntry\022" +
"_value_bytes\030\002 \003(\014\022\035\n\025associated_cell_co" + "\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_bytes" +
"unt\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n" + "\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001(\005\"4\n" +
"\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALE" + "\030ReplicateWALEntryRequest\022\030\n\005entry\030\001 \003(\013" +
"ntryResponse\"\026\n\024RollWALWriterRequest\"0\n\025" + "2\t.WALEntry\"\033\n\031ReplicateWALEntryResponse" +
"RollWALWriterResponse\022\027\n\017region_to_flush" + "\"\026\n\024RollWALWriterRequest\"0\n\025RollWALWrite" +
"\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " + "rResponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021St" +
"\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn", "opServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopS",
"foRequest\"B\n\nServerInfo\022 \n\013server_name\030\001" + "erverResponse\"\026\n\024GetServerInfoRequest\"B\n" +
" \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n" + "\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013.Serve" +
"\025GetServerInfoResponse\022 \n\013server_info\030\001 " + "rName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetServerIn" +
"\002(\0132\013.ServerInfo\"\034\n\032UpdateConfigurationR" + "foResponse\022 \n\013server_info\030\001 \002(\0132\013.Server" +
"equest\"\035\n\033UpdateConfigurationResponse2\325\010" + "Info\"\034\n\032UpdateConfigurationRequest\"\035\n\033Up" +
"\n\014AdminService\022>\n\rGetRegionInfo\022\025.GetReg" + "dateConfigurationResponse2\325\010\n\014AdminServi" +
"ionInfoRequest\032\026.GetRegionInfoResponse\022;" + "ce\022>\n\rGetRegionInfo\022\025.GetRegionInfoReque" +
"\n\014GetStoreFile\022\024.GetStoreFileRequest\032\025.G" + "st\032\026.GetRegionInfoResponse\022;\n\014GetStoreFi" +
"etStoreFileResponse\022D\n\017GetOnlineRegion\022\027" + "le\022\024.GetStoreFileRequest\032\025.GetStoreFileR" +
".GetOnlineRegionRequest\032\030.GetOnlineRegio", "esponse\022D\n\017GetOnlineRegion\022\027.GetOnlineRe",
"nResponse\0225\n\nOpenRegion\022\022.OpenRegionRequ" + "gionRequest\032\030.GetOnlineRegionResponse\0225\n" +
"est\032\023.OpenRegionResponse\022;\n\014WarmupRegion" + "\nOpenRegion\022\022.OpenRegionRequest\032\023.OpenRe" +
"\022\024.WarmupRegionRequest\032\025.WarmupRegionRes" + "gionResponse\022;\n\014WarmupRegion\022\024.WarmupReg" +
"ponse\0228\n\013CloseRegion\022\023.CloseRegionReques" + "ionRequest\032\025.WarmupRegionResponse\0228\n\013Clo" +
"t\032\024.CloseRegionResponse\0228\n\013FlushRegion\022\023" + "seRegion\022\023.CloseRegionRequest\032\024.CloseReg" +
".FlushRegionRequest\032\024.FlushRegionRespons" + "ionResponse\0228\n\013FlushRegion\022\023.FlushRegion" +
"e\0228\n\013SplitRegion\022\023.SplitRegionRequest\032\024." + "Request\032\024.FlushRegionResponse\0228\n\013SplitRe" +
"SplitRegionResponse\022>\n\rCompactRegion\022\025.C" + "gion\022\023.SplitRegionRequest\032\024.SplitRegionR" +
"ompactRegionRequest\032\026.CompactRegionRespo" + "esponse\022>\n\rCompactRegion\022\025.CompactRegion" +
"nse\022;\n\014MergeRegions\022\024.MergeRegionsReques", "Request\032\026.CompactRegionResponse\022;\n\014Merge",
"t\032\025.MergeRegionsResponse\022J\n\021ReplicateWAL" + "Regions\022\024.MergeRegionsRequest\032\025.MergeReg" +
"Entry\022\031.ReplicateWALEntryRequest\032\032.Repli" + "ionsResponse\022J\n\021ReplicateWALEntry\022\031.Repl" +
"cateWALEntryResponse\022?\n\006Replay\022\031.Replica" + "icateWALEntryRequest\032\032.ReplicateWALEntry" +
"teWALEntryRequest\032\032.ReplicateWALEntryRes" + "Response\022?\n\006Replay\022\031.ReplicateWALEntryRe" +
"ponse\022>\n\rRollWALWriter\022\025.RollWALWriterRe" + "quest\032\032.ReplicateWALEntryResponse\022>\n\rRol" +
"quest\032\026.RollWALWriterResponse\022>\n\rGetServ" + "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" +
"erInfo\022\025.GetServerInfoRequest\032\026.GetServe" + "WALWriterResponse\022>\n\rGetServerInfo\022\025.Get" +
"rInfoResponse\0225\n\nStopServer\022\022.StopServer" + "ServerInfoRequest\032\026.GetServerInfoRespons" +
"Request\032\023.StopServerResponse\022M\n\022UpdateFa" + "e\0225\n\nStopServer\022\022.StopServerRequest\032\023.St" +
"voredNodes\022\032.UpdateFavoredNodesRequest\032\033", "opServerResponse\022M\n\022UpdateFavoredNodes\022\032",
".UpdateFavoredNodesResponse\022P\n\023UpdateCon" + ".UpdateFavoredNodesRequest\032\033.UpdateFavor" +
"figuration\022\033.UpdateConfigurationRequest\032" + "edNodesResponse\022P\n\023UpdateConfiguration\022\033" +
"\034.UpdateConfigurationResponseBA\n*org.apa" + ".UpdateConfigurationRequest\032\034.UpdateConf" +
"che.hadoop.hbase.protobuf.generatedB\013Adm" + "igurationResponseBA\n*org.apache.hadoop.h" +
"inProtosH\001\210\001\001\240\001\001" "base.protobuf.generatedB\013AdminProtosH\001\210\001" +
"\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -23377,7 +23500,7 @@ public final class AdminProtos {
internal_static_OpenRegionRequest_fieldAccessorTable = new internal_static_OpenRegionRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_OpenRegionRequest_descriptor, internal_static_OpenRegionRequest_descriptor,
new java.lang.String[] { "OpenInfo", "ServerStartCode", }); new java.lang.String[] { "OpenInfo", "ServerStartCode", "MasterSystemTime", });
internal_static_OpenRegionRequest_RegionOpenInfo_descriptor = internal_static_OpenRegionRequest_RegionOpenInfo_descriptor =
internal_static_OpenRegionRequest_descriptor.getNestedTypes().get(0); internal_static_OpenRegionRequest_descriptor.getNestedTypes().get(0);
internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new

View File

@ -70,6 +70,8 @@ message OpenRegionRequest {
repeated RegionOpenInfo open_info = 1; repeated RegionOpenInfo open_info = 1;
// the intended server for this RPC. // the intended server for this RPC.
optional uint64 serverStartCode = 2; optional uint64 serverStartCode = 2;
// wall clock time from master
optional uint64 master_system_time = 5;
message RegionOpenInfo { message RegionOpenInfo {
required RegionInfo region = 1; required RegionInfo region = 1;

View File

@ -209,7 +209,7 @@ public class RegionStateStore {
if (openSeqNum >= 0) { if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN Preconditions.checkArgument(state == State.OPEN
&& serverName != null, "Open region should be on a server"); && serverName != null, "Open region should be on a server");
MetaTableAccessor.addLocation(put, serverName, openSeqNum, replicaId); MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
info.append("&openSeqNum=").append(openSeqNum); info.append("&openSeqNum=").append(openSeqNum);
info.append("&server=").append(serverName); info.append("&server=").append(serverName);
} }

View File

@ -1870,6 +1870,14 @@ public class HRegionServer extends HasThread implements
@Override @Override
public void postOpenDeployTasks(final Region r) throws KeeperException, IOException { public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
postOpenDeployTasks(new PostOpenDeployContext(r, -1));
}
@Override
public void postOpenDeployTasks(final PostOpenDeployContext context)
throws KeeperException, IOException {
Region r = context.getRegion();
long masterSystemTime = context.getMasterSystemTime();
Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion"); Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
rpcServices.checkOpen(); rpcServices.checkOpen();
LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString()); LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
@ -1896,10 +1904,10 @@ public class HRegionServer extends HasThread implements
State.OPEN); State.OPEN);
} else if (useZKForAssignment) { } else if (useZKForAssignment) {
MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(), MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
this.serverName, openSeqNum); this.serverName, openSeqNum, masterSystemTime);
} }
if (!useZKForAssignment && !reportRegionStateTransition( if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) { TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
throw new IOException("Failed to report opened region to master: " throw new IOException("Failed to report opened region to master: "
+ r.getRegionInfo().getRegionNameAsString()); + r.getRegionInfo().getRegionNameAsString());
} }
@ -1917,6 +1925,17 @@ public class HRegionServer extends HasThread implements
@Override @Override
public boolean reportRegionStateTransition( public boolean reportRegionStateTransition(
TransitionCode code, long openSeqNum, HRegionInfo... hris) { TransitionCode code, long openSeqNum, HRegionInfo... hris) {
return reportRegionStateTransition(
new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
}
@Override
public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
TransitionCode code = context.getCode();
long openSeqNum = context.getOpenSeqNum();
long masterSystemTime = context.getMasterSystemTime();
HRegionInfo[] hris = context.getHris();
ReportRegionStateTransitionRequest.Builder builder = ReportRegionStateTransitionRequest.Builder builder =
ReportRegionStateTransitionRequest.newBuilder(); ReportRegionStateTransitionRequest.newBuilder();
builder.setServer(ProtobufUtil.toServerName(serverName)); builder.setServer(ProtobufUtil.toServerName(serverName));

View File

@ -1410,6 +1410,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
} }
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion()); final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager(). OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
@ -1501,12 +1504,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Need to pass the expected version in the constructor. // Need to pass the expected version in the constructor.
if (region.isMetaRegion()) { if (region.isMetaRegion()) {
regionServer.service.submit(new OpenMetaHandler( regionServer.service.submit(new OpenMetaHandler(
regionServer, regionServer, region, htd, coordination, ord)); regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
} else { } else {
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
regionOpenInfo.getFavoredNodesList()); regionOpenInfo.getFavoredNodesList());
regionServer.service.submit(new OpenRegionHandler( regionServer.service.submit(new OpenRegionHandler(
regionServer, regionServer, region, htd, coordination, ord)); regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
} }
} }

View File

@ -79,6 +79,36 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
*/ */
RegionServerQuotaManager getRegionServerQuotaManager(); RegionServerQuotaManager getRegionServerQuotaManager();
/**
* Context for postOpenDeployTasks().
*/
class PostOpenDeployContext {
private final Region region;
private final long masterSystemTime;
@InterfaceAudience.Private
public PostOpenDeployContext(Region region, long masterSystemTime) {
this.region = region;
this.masterSystemTime = masterSystemTime;
}
public Region getRegion() {
return region;
}
public long getMasterSystemTime() {
return masterSystemTime;
}
}
/**
* Tasks to perform after region open to complete deploy of region on
* regionserver
*
* @param context the context
* @throws KeeperException
* @throws IOException
*/
void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException;
/** /**
* Tasks to perform after region open to complete deploy of region on * Tasks to perform after region open to complete deploy of region on
* regionserver * regionserver
@ -86,17 +116,56 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
* @param r Region to open. * @param r Region to open.
* @throws KeeperException * @throws KeeperException
* @throws IOException * @throws IOException
* @deprecated use {@link #postOpenDeployTasks(PostOpenDeployContext)}
*/ */
@Deprecated
void postOpenDeployTasks(final Region r) throws KeeperException, IOException; void postOpenDeployTasks(final Region r) throws KeeperException, IOException;
class RegionStateTransitionContext {
private final TransitionCode code;
private final long openSeqNum;
private final long masterSystemTime;
private final HRegionInfo[] hris;
@InterfaceAudience.Private
public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long masterSystemTime,
HRegionInfo... hris) {
this.code = code;
this.openSeqNum = openSeqNum;
this.masterSystemTime = masterSystemTime;
this.hris = hris;
}
public TransitionCode getCode() {
return code;
}
public long getOpenSeqNum() {
return openSeqNum;
}
public long getMasterSystemTime() {
return masterSystemTime;
}
public HRegionInfo[] getHris() {
return hris;
}
}
/** /**
* Notify master that a handler requests to change a region state * Notify master that a handler requests to change a region state
*/ */
boolean reportRegionStateTransition(final RegionStateTransitionContext context);
/**
* Notify master that a handler requests to change a region state
* @deprecated use {@link #reportRegionStateTransition(RegionStateTransitionContext)}
*/
@Deprecated
boolean reportRegionStateTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris); boolean reportRegionStateTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris);
/** /**
* Notify master that a handler requests to change a region state * Notify master that a handler requests to change a region state
* @deprecated use {@link #reportRegionStateTransition(RegionStateTransitionContext)}
*/ */
@Deprecated
boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris); boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris);
/** /**

View File

@ -35,9 +35,9 @@ import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
public class OpenMetaHandler extends OpenRegionHandler { public class OpenMetaHandler extends OpenRegionHandler {
public OpenMetaHandler(final Server server, public OpenMetaHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo, final RegionServerServices rsServices, HRegionInfo regionInfo,
final HTableDescriptor htd, OpenRegionCoordination coordination, final HTableDescriptor htd, long masterSystemTime, OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) { OpenRegionCoordination.OpenRegionDetails ord) {
super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META, super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META,
coordination, ord); masterSystemTime, coordination, ord);
} }
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.ConfigUtil;
/** /**
@ -49,6 +50,7 @@ public class OpenRegionHandler extends EventHandler {
private final HRegionInfo regionInfo; private final HRegionInfo regionInfo;
private final HTableDescriptor htd; private final HTableDescriptor htd;
private final long masterSystemTime;
private OpenRegionCoordination coordination; private OpenRegionCoordination coordination;
private OpenRegionCoordination.OpenRegionDetails ord; private OpenRegionCoordination.OpenRegionDetails ord;
@ -57,15 +59,15 @@ public class OpenRegionHandler extends EventHandler {
public OpenRegionHandler(final Server server, public OpenRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo, final RegionServerServices rsServices, HRegionInfo regionInfo,
HTableDescriptor htd, OpenRegionCoordination coordination, HTableDescriptor htd, long masterSystemTime, OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) { OpenRegionCoordination.OpenRegionDetails ord) {
this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
coordination, ord); masterSystemTime, coordination, ord);
} }
protected OpenRegionHandler(final Server server, protected OpenRegionHandler(final Server server,
final RegionServerServices rsServices, final HRegionInfo regionInfo, final RegionServerServices rsServices, final HRegionInfo regionInfo,
final HTableDescriptor htd, EventType eventType, final HTableDescriptor htd, EventType eventType, long masterSystemTime,
OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) { OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) {
super(server, eventType); super(server, eventType);
this.rsServices = rsServices; this.rsServices = rsServices;
@ -74,6 +76,7 @@ public class OpenRegionHandler extends EventHandler {
this.coordination = coordination; this.coordination = coordination;
this.ord = ord; this.ord = ord;
useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration()); useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration());
this.masterSystemTime = masterSystemTime;
} }
public HRegionInfo getRegionInfo() { public HRegionInfo getRegionInfo() {
@ -131,7 +134,7 @@ public class OpenRegionHandler extends EventHandler {
boolean failed = true; boolean failed = true;
if (isRegionStillOpening() && (!useZKForAssignment || if (isRegionStillOpening() && (!useZKForAssignment ||
coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open"))) { coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open"))) {
if (updateMeta(region)) { if (updateMeta(region, masterSystemTime)) {
failed = false; failed = false;
} }
} }
@ -231,7 +234,7 @@ public class OpenRegionHandler extends EventHandler {
* state meantime so master doesn't timeout our region-in-transition. * state meantime so master doesn't timeout our region-in-transition.
* Caller must cleanup region if this fails. * Caller must cleanup region if this fails.
*/ */
boolean updateMeta(final HRegion r) { boolean updateMeta(final HRegion r, long masterSystemTime) {
if (this.server.isStopped() || this.rsServices.isStopping()) { if (this.server.isStopped() || this.rsServices.isStopping()) {
return false; return false;
} }
@ -239,7 +242,7 @@ public class OpenRegionHandler extends EventHandler {
// Else, wait. // Else, wait.
final AtomicBoolean signaller = new AtomicBoolean(false); final AtomicBoolean signaller = new AtomicBoolean(false);
PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r, PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
this.server, this.rsServices, signaller); this.server, this.rsServices, signaller, masterSystemTime);
t.start(); t.start();
// Post open deploy task: // Post open deploy task:
// meta => update meta location in ZK // meta => update meta location in ZK
@ -305,20 +308,23 @@ public class OpenRegionHandler extends EventHandler {
private final RegionServerServices services; private final RegionServerServices services;
private final HRegion region; private final HRegion region;
private final AtomicBoolean signaller; private final AtomicBoolean signaller;
private final long masterSystemTime;
PostOpenDeployTasksThread(final HRegion region, final Server server, PostOpenDeployTasksThread(final HRegion region, final Server server,
final RegionServerServices services, final AtomicBoolean signaller) { final RegionServerServices services, final AtomicBoolean signaller, long masterSystemTime) {
super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName()); super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
this.setDaemon(true); this.setDaemon(true);
this.server = server; this.server = server;
this.services = services; this.services = services;
this.region = region; this.region = region;
this.signaller = signaller; this.signaller = signaller;
this.masterSystemTime = masterSystemTime;
} }
@Override
public void run() { public void run() {
try { try {
this.services.postOpenDeployTasks(this.region); this.services.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
} catch (Throwable e) { } catch (Throwable e) {
String msg = "Exception running postOpenDeployTasks; region=" + String msg = "Exception running postOpenDeployTasks; region=" +
this.region.getRegionInfo().getEncodedName(); this.region.getRegionInfo().getEncodedName();
@ -358,6 +364,7 @@ public class OpenRegionHandler extends EventHandler {
this.server.getConfiguration(), this.server.getConfiguration(),
this.rsServices, this.rsServices,
new CancelableProgressable() { new CancelableProgressable() {
@Override
public boolean progress() { public boolean progress() {
if (useZKForAssignment) { if (useZKForAssignment) {
// if tickle failed, we need to cancel opening region. // if tickle failed, we need to cancel opening region.

View File

@ -180,7 +180,7 @@ public class HBaseFsckRepair {
// see the additional replicas when it is asked to assign. The // see the additional replicas when it is asked to assign. The
// final value of these columns will be different and will be updated // final value of these columns will be different and will be updated
// by the actual regionservers that start hosting the respective replicas // by the actual regionservers that start hosting the respective replicas
MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), i); MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), -1, i);
} }
} }
meta.put(put); meta.put(put);

View File

@ -118,6 +118,12 @@ public class MockRegionServerServices implements RegionServerServices {
addToOnlineRegions(r); addToOnlineRegions(r);
} }
@Override
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
IOException {
addToOnlineRegions(context.getRegion());
}
@Override @Override
public boolean isStopping() { public boolean isStopping() {
return this.stopping; return this.stopping;
@ -278,6 +284,11 @@ public class MockRegionServerServices implements RegionServerServices {
return false; return false;
} }
@Override
public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
return false;
}
@Override @Override
public boolean registerService(Service service) { public boolean registerService(Service service) {
// TODO Auto-generated method stub // TODO Auto-generated method stub

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -344,20 +345,20 @@ public class TestMetaTableAccessor {
Table meta = MetaTableAccessor.getMetaHTable(connection); Table meta = MetaTableAccessor.getMetaHTable(connection);
try { try {
MetaTableAccessor.updateRegionLocation(connection, primary, serverName0, seqNum0); MetaTableAccessor.updateRegionLocation(connection, primary, serverName0, seqNum0, -1);
// assert that the server, startcode and seqNum columns are there for the primary region // assert that the server, startcode and seqNum columns are there for the primary region
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true); assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
// add replica = 1 // add replica = 1
MetaTableAccessor.updateRegionLocation(connection, replica1, serverName1, seqNum1); MetaTableAccessor.updateRegionLocation(connection, replica1, serverName1, seqNum1, -1);
// check whether the primary is still there // check whether the primary is still there
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true); assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
// now check for replica 1 // now check for replica 1
assertMetaLocation(meta, primary.getRegionName(), serverName1, seqNum1, 1, true); assertMetaLocation(meta, primary.getRegionName(), serverName1, seqNum1, 1, true);
// add replica = 1 // add replica = 1
MetaTableAccessor.updateRegionLocation(connection, replica100, serverName100, seqNum100); MetaTableAccessor.updateRegionLocation(connection, replica100, serverName100, seqNum100, -1);
// check whether the primary is still there // check whether the primary is still there
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true); assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
// check whether the replica 1 is still there // check whether the replica 1 is still there
@ -471,5 +472,42 @@ public class TestMetaTableAccessor {
meta.close(); meta.close();
} }
} }
@Test
public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException {
long regionId = System.currentTimeMillis();
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf("table_foo"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0);
ServerName sn = ServerName.valueOf("bar", 0, 0);
Table meta = MetaTableAccessor.getMetaHTable(connection);
try {
List<HRegionInfo> regionInfos = Lists.newArrayList(regionInfo);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
long masterSystemTime = EnvironmentEdgeManager.currentTime() + 123456789;
MetaTableAccessor.updateRegionLocation(connection, regionInfo, sn, 1, masterSystemTime);
Get get = new Get(regionInfo.getRegionName());
Result result = meta.get(get);
Cell serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getServerColumn(0));
Cell startCodeCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getStartCodeColumn(0));
Cell seqNumCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getSeqNumColumn(0));
assertNotNull(serverCell);
assertNotNull(startCodeCell);
assertNotNull(seqNumCell);
assertTrue(serverCell.getValueLength() > 0);
assertTrue(startCodeCell.getValueLength() > 0);
assertTrue(seqNumCell.getValueLength() > 0);
assertEquals(masterSystemTime, serverCell.getTimestamp());
assertEquals(masterSystemTime, startCodeCell.getTimestamp());
assertEquals(masterSystemTime, seqNumCell.getTimestamp());
} finally {
meta.close();
}
}
} }

View File

@ -339,6 +339,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
@Override
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
IOException {
// TODO Auto-generated method stub
}
@Override @Override
public RpcServerInterface getRpcServer() { public RpcServerInterface getRpcServer() {
// TODO Auto-generated method stub // TODO Auto-generated method stub
@ -609,6 +615,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return false; return false;
} }
@Override
public boolean reportRegionStateTransition(RegionStateTransitionContext context) {
return false;
}
@Override @Override
public boolean registerService(Service service) { public boolean registerService(Service service) {
// TODO Auto-generated method stub // TODO Auto-generated method stub

View File

@ -238,7 +238,7 @@ public class TestAssignmentManagerOnCluster {
MetaTableAccessor.addRegionToMeta(meta, hri); MetaTableAccessor.addRegionToMeta(meta, hri);
// Add some dummy server for the region entry // Add some dummy server for the region entry
MetaTableAccessor.updateRegionLocation(TEST_UTIL.getHBaseCluster().getMaster().getConnection(), hri, MetaTableAccessor.updateRegionLocation(TEST_UTIL.getHBaseCluster().getMaster().getConnection(), hri,
ServerName.valueOf("example.org", 1234, System.currentTimeMillis()), 0); ServerName.valueOf("example.org", 1234, System.currentTimeMillis()), 0, -1);
RegionStates regionStates = master.getAssignmentManager().getRegionStates(); RegionStates regionStates = master.getAssignmentManager().getRegionStates();
int i = TEST_UTIL.getHBaseCluster().getServerWithMeta(); int i = TEST_UTIL.getHBaseCluster().getServerWithMeta();
HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(i == 0 ? 1 : 0); HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(i == 0 ? 1 : 0);

View File

@ -294,7 +294,7 @@ public class TestRegionServerNoMaster {
// we re-opened meta so some of its data is lost // we re-opened meta so some of its data is lost
ServerName sn = getRS().getServerName(); ServerName sn = getRS().getServerName();
MetaTableAccessor.updateRegionLocation(getRS().getConnection(), MetaTableAccessor.updateRegionLocation(getRS().getConnection(),
hri, sn, getRS().getRegion(regionName).getOpenSeqNum()); hri, sn, getRS().getRegion(regionName).getOpenSeqNum(), -1);
// fake region to be closing now, need to clear state afterwards // fake region to be closing now, need to clear state afterwards
getRS().regionsInTransitionInRS.put(hri.getEncodedNameAsBytes(), Boolean.FALSE); getRS().regionsInTransitionInRS.put(hri.getEncodedNameAsBytes(), Boolean.FALSE);
AdminProtos.OpenRegionRequest orr = AdminProtos.OpenRegionRequest orr =
@ -376,7 +376,7 @@ public class TestRegionServerNoMaster {
zkCrd.setVersionOfOfflineNode(0); zkCrd.setVersionOfOfflineNode(0);
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
csm.getOpenRegionCoordination(), zkCrd)); -1, csm.getOpenRegionCoordination(), zkCrd));
// The open handler should have removed the region from RIT but kept the region closed // The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed(HTU, getRS(), hri); checkRegionIsClosed(HTU, getRS(), hri);
@ -441,7 +441,7 @@ public class TestRegionServerNoMaster {
zkCrd.setVersionOfOfflineNode(0); zkCrd.setVersionOfOfflineNode(0);
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
csm.getOpenRegionCoordination(), zkCrd)); -1, csm.getOpenRegionCoordination(), zkCrd));
// The open handler should have removed the region from RIT but kept the region closed // The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed(HTU, getRS(), hri); checkRegionIsClosed(HTU, getRS(), hri);

View File

@ -241,7 +241,7 @@ public class TestCloseRegionHandler {
OpenRegionCoordination.OpenRegionDetails ord = OpenRegionCoordination.OpenRegionDetails ord =
coordination.getDetailsForNonCoordinatedOpening(); coordination.getDetailsForNonCoordinatedOpening();
OpenRegionHandler openHandler = OpenRegionHandler openHandler =
new OpenRegionHandler(server, rss, hri, htd, coordination, ord); new OpenRegionHandler(server, rss, hri, htd, -1, coordination, ord);
rss.getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE); rss.getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
openHandler.process(); openHandler.process();
// This parse is not used? // This parse is not used?

View File

@ -109,7 +109,7 @@ public class TestOpenRegionHandler {
zkCrd.setServerName(server.getServerName()); zkCrd.setServerName(server.getServerName());
OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri,
htd, csm.getOpenRegionCoordination(), zkCrd) { htd, -1, csm.getOpenRegionCoordination(), zkCrd) {
HRegion openRegion() { HRegion openRegion() {
// Open region first, then remove znode as though it'd been hijacked. // Open region first, then remove znode as though it'd been hijacked.
HRegion region = super.openRegion(); HRegion region = super.openRegion();
@ -191,7 +191,7 @@ public class TestOpenRegionHandler {
}; };
OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd, OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd,
openRegionCoordination, zkCrd); -1, openRegionCoordination, zkCrd);
rss.getRegionsInTransitionInRS().put( rss.getRegionsInTransitionInRS().put(
hri.getEncodedNameAsBytes(), Boolean.TRUE); hri.getEncodedNameAsBytes(), Boolean.TRUE);
// Call process without first creating OFFLINE region in zk, see if // Call process without first creating OFFLINE region in zk, see if
@ -231,7 +231,7 @@ public class TestOpenRegionHandler {
// Create the handler // Create the handler
OpenRegionHandler handler = OpenRegionHandler handler =
new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, -1,
csm.getOpenRegionCoordination(), zkCrd) { csm.getOpenRegionCoordination(), zkCrd) {
@Override @Override
HRegion openRegion() { HRegion openRegion() {
@ -267,9 +267,9 @@ public class TestOpenRegionHandler {
zkCrd.setServerName(server.getServerName()); zkCrd.setServerName(server.getServerName());
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
csm.getOpenRegionCoordination(), zkCrd) { -1, csm.getOpenRegionCoordination(), zkCrd) {
@Override @Override
boolean updateMeta(final HRegion r) { boolean updateMeta(final HRegion r, long masterSystemTime) {
// Fake failure of updating META // Fake failure of updating META
return false; return false;
} }
@ -300,9 +300,9 @@ public class TestOpenRegionHandler {
zkCrd.setServerName(server.getServerName()); zkCrd.setServerName(server.getServerName());
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
csm.getOpenRegionCoordination(), zkCrd) { -1, csm.getOpenRegionCoordination(), zkCrd) {
@Override @Override
boolean updateMeta(HRegion r) { boolean updateMeta(HRegion r, long masterSystemTime) {
return false; return false;
}; };
@ -347,7 +347,7 @@ public class TestOpenRegionHandler {
}; };
OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
openRegionCoordination, zkCrd); -1, openRegionCoordination, zkCrd);
rsServices.getRegionsInTransitionInRS().put(TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE); rsServices.getRegionsInTransitionInRS().put(TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE);
handler.process(); handler.process();

View File

@ -765,7 +765,7 @@ public class TestHBaseFsck {
ServerName sn = TEST_UTIL.getHBaseAdmin().getClusterStatus().getServers() ServerName sn = TEST_UTIL.getHBaseAdmin().getClusterStatus().getServers()
.toArray(new ServerName[0])[0]; .toArray(new ServerName[0])[0];
//add a location with replicaId as 2 (since we already have replicas with replicaid 0 and 1) //add a location with replicaId as 2 (since we already have replicas with replicaid 0 and 1)
MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), 2); MetaTableAccessor.addLocation(put, sn, sn.getStartcode(), -1, 2);
meta.put(put); meta.put(put);
meta.flushCommits(); meta.flushCommits();
// assign the new replica // assign the new replica