HBASE-13938 Deletes done during the region merge transaction may get eclipsed (ddas, enis, ndimiduk)

This commit is contained in:
Nick Dimiduk 2015-06-23 10:19:05 -07:00
parent 9db3ea3406
commit 3e72dc9f08
10 changed files with 282 additions and 64 deletions

View File

@ -1306,12 +1306,20 @@ public class MetaTableAccessor {
* table * table
*/ */
public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) { public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) {
long now = EnvironmentEdgeManager.currentTime();
return makeDeleteFromRegionInfo(regionInfo, now);
}
/**
* Generates and returns a Delete containing the region info for the catalog
* table
*/
public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo, long ts) {
if (regionInfo == null) { if (regionInfo == null) {
throw new IllegalArgumentException("Can't make a delete for null region"); throw new IllegalArgumentException("Can't make a delete for null region");
} }
long now = EnvironmentEdgeManager.currentTime();
Delete delete = new Delete(regionInfo.getRegionName()); Delete delete = new Delete(regionInfo.getRegionName());
delete.addFamily(getCatalogFamily(), now); delete.addFamily(getCatalogFamily(), ts);
return delete; return delete;
} }
@ -1597,25 +1605,30 @@ public class MetaTableAccessor {
* @param regionA * @param regionA
* @param regionB * @param regionB
* @param sn the location of the region * @param sn the location of the region
* @param masterSystemTime
* @throws IOException * @throws IOException
*/ */
public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion, public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication) HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
long masterSystemTime)
throws IOException { throws IOException {
Table meta = getMetaHTable(connection); Table meta = getMetaHTable(connection);
try { try {
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
// use the maximum of what master passed us vs local time.
long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
// Put for parent // Put for parent
Put putOfMerged = makePutFromRegionInfo(copyOfMerged); Put putOfMerged = makePutFromRegionInfo(copyOfMerged, time);
putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
regionA.toByteArray()); regionA.toByteArray());
putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
regionB.toByteArray()); regionB.toByteArray());
// Deletes for merging regions // Deletes for merging regions
Delete deleteA = makeDeleteFromRegionInfo(regionA); Delete deleteA = makeDeleteFromRegionInfo(regionA, time);
Delete deleteB = makeDeleteFromRegionInfo(regionB); Delete deleteB = makeDeleteFromRegionInfo(regionB, time);
// The merged is a new region, openSeqNum = 1 is fine. // The merged is a new region, openSeqNum = 1 is fine.
addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId()); addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId());

View File

@ -925,6 +925,8 @@ public final class RequestConverter {
builder.setRegionA(regionASpecifier); builder.setRegionA(regionASpecifier);
builder.setRegionB(regionBSpecifier); builder.setRegionB(regionBSpecifier);
builder.setForcible(forcible); builder.setForcible(forcible);
// send the master's wall clock time as well, so that the RS can refer to it
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build(); return builder.build();
} }

View File

@ -14642,6 +14642,24 @@ public final class AdminProtos {
* <code>optional bool forcible = 3 [default = false];</code> * <code>optional bool forcible = 3 [default = false];</code>
*/ */
boolean getForcible(); boolean getForcible();
// optional uint64 master_system_time = 4;
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
boolean hasMasterSystemTime();
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
long getMasterSystemTime();
} }
/** /**
* Protobuf type {@code MergeRegionsRequest} * Protobuf type {@code MergeRegionsRequest}
@ -14732,6 +14750,11 @@ public final class AdminProtos {
forcible_ = input.readBool(); forcible_ = input.readBool();
break; break;
} }
case 32: {
bitField0_ |= 0x00000008;
masterSystemTime_ = input.readUInt64();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -14832,10 +14855,35 @@ public final class AdminProtos {
return forcible_; return forcible_;
} }
// optional uint64 master_system_time = 4;
public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 4;
private long masterSystemTime_;
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
private void initFields() { private void initFields() {
regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
forcible_ = false; forcible_ = false;
masterSystemTime_ = 0L;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -14874,6 +14922,9 @@ public final class AdminProtos {
if (((bitField0_ & 0x00000004) == 0x00000004)) { if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(3, forcible_); output.writeBool(3, forcible_);
} }
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeUInt64(4, masterSystemTime_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -14895,6 +14946,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, forcible_); .computeBoolSize(3, forcible_);
} }
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(4, masterSystemTime_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -14933,6 +14988,11 @@ public final class AdminProtos {
result = result && (getForcible() result = result && (getForcible()
== other.getForcible()); == other.getForcible());
} }
result = result && (hasMasterSystemTime() == other.hasMasterSystemTime());
if (hasMasterSystemTime()) {
result = result && (getMasterSystemTime()
== other.getMasterSystemTime());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -14958,6 +15018,10 @@ public final class AdminProtos {
hash = (37 * hash) + FORCIBLE_FIELD_NUMBER; hash = (37 * hash) + FORCIBLE_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getForcible()); hash = (53 * hash) + hashBoolean(getForcible());
} }
if (hasMasterSystemTime()) {
hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getMasterSystemTime());
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -15090,6 +15154,8 @@ public final class AdminProtos {
bitField0_ = (bitField0_ & ~0x00000002); bitField0_ = (bitField0_ & ~0x00000002);
forcible_ = false; forcible_ = false;
bitField0_ = (bitField0_ & ~0x00000004); bitField0_ = (bitField0_ & ~0x00000004);
masterSystemTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000008);
return this; return this;
} }
@ -15138,6 +15204,10 @@ public final class AdminProtos {
to_bitField0_ |= 0x00000004; to_bitField0_ |= 0x00000004;
} }
result.forcible_ = forcible_; result.forcible_ = forcible_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.masterSystemTime_ = masterSystemTime_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -15163,6 +15233,9 @@ public final class AdminProtos {
if (other.hasForcible()) { if (other.hasForcible()) {
setForcible(other.getForcible()); setForcible(other.getForcible());
} }
if (other.hasMasterSystemTime()) {
setMasterSystemTime(other.getMasterSystemTime());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -15473,6 +15546,55 @@ public final class AdminProtos {
return this; return this;
} }
// optional uint64 master_system_time = 4;
private long masterSystemTime_ ;
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder setMasterSystemTime(long value) {
bitField0_ |= 0x00000008;
masterSystemTime_ = value;
onChanged();
return this;
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder clearMasterSystemTime() {
bitField0_ = (bitField0_ & ~0x00000008);
masterSystemTime_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:MergeRegionsRequest) // @@protoc_insertion_point(builder_scope:MergeRegionsRequest)
} }
@ -23406,53 +23528,53 @@ public final class AdminProtos {
"UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" + "UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" +
"\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" + "\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" +
"pdateFavoredNodesResponse\022\020\n\010response\030\001 " + "pdateFavoredNodesResponse\022\020\n\010response\030\001 " +
"\001(\r\"v\n\023MergeRegionsRequest\022\"\n\010region_a\030\001", "\001(\r\"\222\001\n\023MergeRegionsRequest\022\"\n\010region_a\030",
" \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(\013" + "\001 \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(" +
"2\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005fa" + "\0132\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005f" +
"lse\"\026\n\024MergeRegionsResponse\"X\n\010WALEntry\022" + "alse\022\032\n\022master_system_time\030\004 \001(\004\"\026\n\024Merg" +
"\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_bytes" + "eRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(" +
"\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001(\005\"4\n" + "\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025a" +
"\030ReplicateWALEntryRequest\022\030\n\005entry\030\001 \003(\013" + "ssociated_cell_count\030\003 \001(\005\"4\n\030ReplicateW" +
"2\t.WALEntry\"\033\n\031ReplicateWALEntryResponse" + "ALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry" +
"\"\026\n\024RollWALWriterRequest\"0\n\025RollWALWrite" + "\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWAL" +
"rResponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021St" + "WriterRequest\"0\n\025RollWALWriterResponse\022\027" +
"opServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopS", "\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerReq",
"erverResponse\"\026\n\024GetServerInfoRequest\"B\n" + "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" +
"\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013.Serve" + "se\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo" +
"rName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetServerIn" + "\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nwe" +
"foResponse\022 \n\013server_info\030\001 \002(\0132\013.Server" + "bui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022" +
"Info\"\034\n\032UpdateConfigurationRequest\"\035\n\033Up" + " \n\013server_info\030\001 \002(\0132\013.ServerInfo\"\034\n\032Upd" +
"dateConfigurationResponse2\325\010\n\014AdminServi" + "ateConfigurationRequest\"\035\n\033UpdateConfigu" +
"ce\022>\n\rGetRegionInfo\022\025.GetRegionInfoReque" + "rationResponse2\325\010\n\014AdminService\022>\n\rGetRe" +
"st\032\026.GetRegionInfoResponse\022;\n\014GetStoreFi" + "gionInfo\022\025.GetRegionInfoRequest\032\026.GetReg" +
"le\022\024.GetStoreFileRequest\032\025.GetStoreFileR" + "ionInfoResponse\022;\n\014GetStoreFile\022\024.GetSto" +
"esponse\022D\n\017GetOnlineRegion\022\027.GetOnlineRe", "reFileRequest\032\025.GetStoreFileResponse\022D\n\017",
"gionRequest\032\030.GetOnlineRegionResponse\0225\n" + "GetOnlineRegion\022\027.GetOnlineRegionRequest" +
"\nOpenRegion\022\022.OpenRegionRequest\032\023.OpenRe" + "\032\030.GetOnlineRegionResponse\0225\n\nOpenRegion" +
"gionResponse\022;\n\014WarmupRegion\022\024.WarmupReg" + "\022\022.OpenRegionRequest\032\023.OpenRegionRespons" +
"ionRequest\032\025.WarmupRegionResponse\0228\n\013Clo" + "e\022;\n\014WarmupRegion\022\024.WarmupRegionRequest\032" +
"seRegion\022\023.CloseRegionRequest\032\024.CloseReg" + "\025.WarmupRegionResponse\0228\n\013CloseRegion\022\023." +
"ionResponse\0228\n\013FlushRegion\022\023.FlushRegion" + "CloseRegionRequest\032\024.CloseRegionResponse" +
"Request\032\024.FlushRegionResponse\0228\n\013SplitRe" + "\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024.F" +
"gion\022\023.SplitRegionRequest\032\024.SplitRegionR" + "lushRegionResponse\0228\n\013SplitRegion\022\023.Spli" +
"esponse\022>\n\rCompactRegion\022\025.CompactRegion" + "tRegionRequest\032\024.SplitRegionResponse\022>\n\r" +
"Request\032\026.CompactRegionResponse\022;\n\014Merge", "CompactRegion\022\025.CompactRegionRequest\032\026.C",
"Regions\022\024.MergeRegionsRequest\032\025.MergeReg" + "ompactRegionResponse\022;\n\014MergeRegions\022\024.M" +
"ionsResponse\022J\n\021ReplicateWALEntry\022\031.Repl" + "ergeRegionsRequest\032\025.MergeRegionsRespons" +
"icateWALEntryRequest\032\032.ReplicateWALEntry" + "e\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEnt" +
"Response\022?\n\006Replay\022\031.ReplicateWALEntryRe" + "ryRequest\032\032.ReplicateWALEntryResponse\022?\n" +
"quest\032\032.ReplicateWALEntryResponse\022>\n\rRol" + "\006Replay\022\031.ReplicateWALEntryRequest\032\032.Rep" +
"lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + "licateWALEntryResponse\022>\n\rRollWALWriter\022" +
"WALWriterResponse\022>\n\rGetServerInfo\022\025.Get" + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" +
"ServerInfoRequest\032\026.GetServerInfoRespons" + "sponse\022>\n\rGetServerInfo\022\025.GetServerInfoR" +
"e\0225\n\nStopServer\022\022.StopServerRequest\032\023.St" + "equest\032\026.GetServerInfoResponse\0225\n\nStopSe" +
"opServerResponse\022M\n\022UpdateFavoredNodes\022\032", "rver\022\022.StopServerRequest\032\023.StopServerRes",
".UpdateFavoredNodesRequest\032\033.UpdateFavor" + "ponse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavo" +
"edNodesResponse\022P\n\023UpdateConfiguration\022\033" + "redNodesRequest\032\033.UpdateFavoredNodesResp" +
".UpdateConfigurationRequest\032\034.UpdateConf" + "onse\022P\n\023UpdateConfiguration\022\033.UpdateConf" +
"igurationResponseBA\n*org.apache.hadoop.h" + "igurationRequest\032\034.UpdateConfigurationRe" +
"base.protobuf.generatedB\013AdminProtosH\001\210\001" + "sponseBA\n*org.apache.hadoop.hbase.protob" +
"\001\240\001\001" "uf.generatedB\013AdminProtosH\001\210\001\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -23596,7 +23718,7 @@ public final class AdminProtos {
internal_static_MergeRegionsRequest_fieldAccessorTable = new internal_static_MergeRegionsRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MergeRegionsRequest_descriptor, internal_static_MergeRegionsRequest_descriptor,
new java.lang.String[] { "RegionA", "RegionB", "Forcible", }); new java.lang.String[] { "RegionA", "RegionB", "Forcible", "MasterSystemTime", });
internal_static_MergeRegionsResponse_descriptor = internal_static_MergeRegionsResponse_descriptor =
getDescriptor().getMessageTypes().get(21); getDescriptor().getMessageTypes().get(21);
internal_static_MergeRegionsResponse_fieldAccessorTable = new internal_static_MergeRegionsResponse_fieldAccessorTable = new

View File

@ -185,6 +185,8 @@ message MergeRegionsRequest {
required RegionSpecifier region_a = 1; required RegionSpecifier region_a = 1;
required RegionSpecifier region_b = 2; required RegionSpecifier region_b = 2;
optional bool forcible = 3 [default = false]; optional bool forcible = 3 [default = false];
// wall clock time from master
optional uint64 master_system_time = 4;
} }
message MergeRegionsResponse { message MergeRegionsResponse {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.util.MultiHConnection;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -246,6 +247,7 @@ public class RegionStateStore {
void mergeRegions(HRegionInfo p, void mergeRegions(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication); MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
EnvironmentEdgeManager.currentTime());
} }
} }

View File

@ -221,9 +221,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
} }
public synchronized void requestRegionsMerge(final Region a, public synchronized void requestRegionsMerge(final Region a,
final Region b, final boolean forcible) { final Region b, final boolean forcible, long masterSystemTime) {
try { try {
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible)); mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
+ forcible + ". " + this); + forcible + ". " + this);

View File

@ -1365,6 +1365,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Region regionA = getRegion(request.getRegionA()); Region regionA = getRegion(request.getRegionA());
Region regionB = getRegion(request.getRegionB()); Region regionB = getRegion(request.getRegionB());
boolean forcible = request.getForcible(); boolean forcible = request.getForcible();
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
regionA.startRegionOperation(Operation.MERGE_REGION); regionA.startRegionOperation(Operation.MERGE_REGION);
regionB.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION);
if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
@ -1385,7 +1386,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
long endTime = EnvironmentEdgeManager.currentTime(); long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
} }
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
masterSystemTime);
return MergeRegionsResponse.newBuilder().build(); return MergeRegionsResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) { } catch (DroppedSnapshotException ex) {
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);

View File

@ -42,13 +42,16 @@ class RegionMergeRequest implements Runnable {
private final HRegionServer server; private final HRegionServer server;
private final boolean forcible; private final boolean forcible;
private TableLock tableLock; private TableLock tableLock;
private final long masterSystemTime;
RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible) { RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible,
long masterSystemTime) {
Preconditions.checkNotNull(hrs); Preconditions.checkNotNull(hrs);
this.region_a = (HRegion)a; this.region_a = (HRegion)a;
this.region_b = (HRegion)b; this.region_b = (HRegion)b;
this.server = hrs; this.server = hrs;
this.forcible = forcible; this.forcible = forcible;
this.masterSystemTime = masterSystemTime;
} }
@Override @Override
@ -67,7 +70,7 @@ class RegionMergeRequest implements Runnable {
try { try {
final long startTime = EnvironmentEdgeManager.currentTime(); final long startTime = EnvironmentEdgeManager.currentTime();
RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a,
region_b, forcible); region_b, forcible, masterSystemTime);
//acquire a shared read lock on the table, so that table schema modifications //acquire a shared read lock on the table, so that table schema modifications
//do not happen concurrently //do not happen concurrently

View File

@ -58,6 +58,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
private final Path mergesdir; private final Path mergesdir;
// We only merge adjacent regions if forcible is false // We only merge adjacent regions if forcible is false
private final boolean forcible; private final boolean forcible;
private final long masterSystemTime;
/* /*
* Transaction state for listener, only valid during execute and * Transaction state for listener, only valid during execute and
@ -123,6 +124,17 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
*/ */
public RegionMergeTransactionImpl(final Region a, final Region b, public RegionMergeTransactionImpl(final Region a, final Region b,
final boolean forcible) { final boolean forcible) {
this(a, b, forcible, EnvironmentEdgeManager.currentTime());
}
/**
* Constructor
* @param a region a to merge
* @param b region b to merge
* @param forcible if false, we will only merge adjacent regions
* @param masterSystemTime the time at the master side
*/
public RegionMergeTransactionImpl(final Region a, final Region b,
final boolean forcible, long masterSystemTime) {
if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
this.region_a = (HRegion)a; this.region_a = (HRegion)a;
this.region_b = (HRegion)b; this.region_b = (HRegion)b;
@ -131,6 +143,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
this.region_b = (HRegion)a; this.region_b = (HRegion)a;
} }
this.forcible = forcible; this.forcible = forcible;
this.masterSystemTime = masterSystemTime;
this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
} }
@ -313,16 +326,19 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException { HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
// use the maximum of what master passed us vs local time.
long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
// Put for parent // Put for parent
Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged); Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time);
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
regionA.toByteArray()); regionA.toByteArray());
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
regionB.toByteArray()); regionB.toByteArray());
mutations.add(putOfMerged); mutations.add(putOfMerged);
// Deletes for merging regions // Deletes for merging regions
Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA); Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time);
Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB); Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time);
mutations.add(deleteA); mutations.add(deleteA);
mutations.add(deleteB); mutations.add(deleteB);
// The merged is a new region, openSeqNum = 1 is fine. // The merged is a new region, openSeqNum = 1 is fine.

View File

@ -474,7 +474,8 @@ public class TestMetaTableAccessor {
List<HRegionInfo> regionInfos = Lists.newArrayList(parentA, parentB); List<HRegionInfo> regionInfos = Lists.newArrayList(parentA, parentB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3); MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
HConstants.LATEST_TIMESTAMP);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2); assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@ -527,6 +528,9 @@ public class TestMetaTableAccessor {
table.close(); table.close();
} }
/**
* Tests whether maximum of masters system time versus RSs local system time is used
*/
@Test @Test
public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException { public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException {
long regionId = System.currentTimeMillis(); long regionId = System.currentTimeMillis();
@ -563,5 +567,57 @@ public class TestMetaTableAccessor {
meta.close(); meta.close();
} }
} }
@Test
public void testMastersSystemTimeIsUsedInMergeRegions() throws IOException {
long regionId = System.currentTimeMillis();
HRegionInfo regionInfoA = new HRegionInfo(TableName.valueOf("table_foo"),
HConstants.EMPTY_START_ROW, new byte[] {'a'}, false, regionId, 0);
HRegionInfo regionInfoB = new HRegionInfo(TableName.valueOf("table_foo"),
new byte[] {'a'}, HConstants.EMPTY_END_ROW, false, regionId, 0);
HRegionInfo mergedRegionInfo = new HRegionInfo(TableName.valueOf("table_foo"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0);
ServerName sn = ServerName.valueOf("bar", 0, 0);
Table meta = MetaTableAccessor.getMetaHTable(connection);
try {
List<HRegionInfo> regionInfos = Lists.newArrayList(regionInfoA, regionInfoB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
// write the serverName column with a big current time, but set the masters time as even
// bigger. When region merge deletes the rows for regionA and regionB, the serverName columns
// should not be seen by the following get
long serverNameTime = EnvironmentEdgeManager.currentTime() + 100000000;
long masterSystemTime = EnvironmentEdgeManager.currentTime() + 123456789;
// write the serverName columns
MetaTableAccessor.updateRegionLocation(connection, regionInfoA, sn, 1, serverNameTime);
// assert that we have the serverName column with expected ts
Get get = new Get(mergedRegionInfo.getRegionName());
Result result = meta.get(get);
Cell serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getServerColumn(0));
assertNotNull(serverCell);
assertEquals(serverNameTime, serverCell.getTimestamp());
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
regionInfoA, regionInfoB, sn, 1, masterSystemTime);
result = meta.get(get);
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getServerColumn(0));
Cell startCodeCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getStartCodeColumn(0));
Cell seqNumCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getSeqNumColumn(0));
assertNull(serverCell);
assertNull(startCodeCell);
assertNull(seqNumCell);
} finally {
meta.close();
}
}
} }