HBASE-13938 Deletes done during the region merge transaction may get eclipsed (ddas, enis, ndimiduk)

This commit is contained in:
Nick Dimiduk 2015-06-22 10:23:13 -07:00
parent 0271afc1b7
commit 5a24d16f4c
10 changed files with 286 additions and 65 deletions

View File

@ -945,10 +945,18 @@ public class MetaTableAccessor {
* table * table
*/ */
public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) { public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) {
return makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
}
/**
* Generates and returns a Delete containing the region info for the catalog
* table
*/
public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo, long ts) {
if (regionInfo == null) { if (regionInfo == null) {
throw new IllegalArgumentException("Can't make a delete for null region"); throw new IllegalArgumentException("Can't make a delete for null region");
} }
Delete delete = new Delete(regionInfo.getRegionName()); Delete delete = new Delete(regionInfo.getRegionName(), ts);
return delete; return delete;
} }
@ -1217,25 +1225,30 @@ public class MetaTableAccessor {
* @param regionA * @param regionA
* @param regionB * @param regionB
* @param sn the location of the region * @param sn the location of the region
* @param masterSystemTime
* @throws IOException * @throws IOException
*/ */
public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion, public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication) HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
long masterSystemTime)
throws IOException { throws IOException {
Table meta = getMetaHTable(connection); Table meta = getMetaHTable(connection);
try { try {
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
// use the maximum of what master passed us vs local time.
long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
// Put for parent // Put for parent
Put putOfMerged = makePutFromRegionInfo(copyOfMerged); Put putOfMerged = makePutFromRegionInfo(copyOfMerged, time);
putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
regionA.toByteArray()); regionA.toByteArray());
putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, putOfMerged.addImmutable(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
regionB.toByteArray()); regionB.toByteArray());
// Deletes for merging regions // Deletes for merging regions
Delete deleteA = makeDeleteFromRegionInfo(regionA); Delete deleteA = makeDeleteFromRegionInfo(regionA, time);
Delete deleteB = makeDeleteFromRegionInfo(regionB); Delete deleteB = makeDeleteFromRegionInfo(regionB, time);
// The merged is a new region, openSeqNum = 1 is fine. // The merged is a new region, openSeqNum = 1 is fine.
addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId()); addLocation(putOfMerged, sn, 1, -1, mergedRegion.getReplicaId());

View File

@ -945,6 +945,8 @@ public final class RequestConverter {
builder.setRegionA(regionASpecifier); builder.setRegionA(regionASpecifier);
builder.setRegionB(regionBSpecifier); builder.setRegionB(regionBSpecifier);
builder.setForcible(forcible); builder.setForcible(forcible);
// send the master's wall clock time as well, so that the RS can refer to it
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
return builder.build(); return builder.build();
} }

View File

@ -14642,6 +14642,24 @@ public final class AdminProtos {
* <code>optional bool forcible = 3 [default = false];</code> * <code>optional bool forcible = 3 [default = false];</code>
*/ */
boolean getForcible(); boolean getForcible();
// optional uint64 master_system_time = 4;
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
boolean hasMasterSystemTime();
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
long getMasterSystemTime();
} }
/** /**
* Protobuf type {@code MergeRegionsRequest} * Protobuf type {@code MergeRegionsRequest}
@ -14732,6 +14750,11 @@ public final class AdminProtos {
forcible_ = input.readBool(); forcible_ = input.readBool();
break; break;
} }
case 32: {
bitField0_ |= 0x00000008;
masterSystemTime_ = input.readUInt64();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -14832,10 +14855,35 @@ public final class AdminProtos {
return forcible_; return forcible_;
} }
// optional uint64 master_system_time = 4;
public static final int MASTER_SYSTEM_TIME_FIELD_NUMBER = 4;
private long masterSystemTime_;
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
private void initFields() { private void initFields() {
regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionA_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); regionB_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
forcible_ = false; forcible_ = false;
masterSystemTime_ = 0L;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -14874,6 +14922,9 @@ public final class AdminProtos {
if (((bitField0_ & 0x00000004) == 0x00000004)) { if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(3, forcible_); output.writeBool(3, forcible_);
} }
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeUInt64(4, masterSystemTime_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -14895,6 +14946,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, forcible_); .computeBoolSize(3, forcible_);
} }
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(4, masterSystemTime_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -14933,6 +14988,11 @@ public final class AdminProtos {
result = result && (getForcible() result = result && (getForcible()
== other.getForcible()); == other.getForcible());
} }
result = result && (hasMasterSystemTime() == other.hasMasterSystemTime());
if (hasMasterSystemTime()) {
result = result && (getMasterSystemTime()
== other.getMasterSystemTime());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -14958,6 +15018,10 @@ public final class AdminProtos {
hash = (37 * hash) + FORCIBLE_FIELD_NUMBER; hash = (37 * hash) + FORCIBLE_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getForcible()); hash = (53 * hash) + hashBoolean(getForcible());
} }
if (hasMasterSystemTime()) {
hash = (37 * hash) + MASTER_SYSTEM_TIME_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getMasterSystemTime());
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -15090,6 +15154,8 @@ public final class AdminProtos {
bitField0_ = (bitField0_ & ~0x00000002); bitField0_ = (bitField0_ & ~0x00000002);
forcible_ = false; forcible_ = false;
bitField0_ = (bitField0_ & ~0x00000004); bitField0_ = (bitField0_ & ~0x00000004);
masterSystemTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000008);
return this; return this;
} }
@ -15138,6 +15204,10 @@ public final class AdminProtos {
to_bitField0_ |= 0x00000004; to_bitField0_ |= 0x00000004;
} }
result.forcible_ = forcible_; result.forcible_ = forcible_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.masterSystemTime_ = masterSystemTime_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -15163,6 +15233,9 @@ public final class AdminProtos {
if (other.hasForcible()) { if (other.hasForcible()) {
setForcible(other.getForcible()); setForcible(other.getForcible());
} }
if (other.hasMasterSystemTime()) {
setMasterSystemTime(other.getMasterSystemTime());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -15473,6 +15546,55 @@ public final class AdminProtos {
return this; return this;
} }
// optional uint64 master_system_time = 4;
private long masterSystemTime_ ;
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public boolean hasMasterSystemTime() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public long getMasterSystemTime() {
return masterSystemTime_;
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder setMasterSystemTime(long value) {
bitField0_ |= 0x00000008;
masterSystemTime_ = value;
onChanged();
return this;
}
/**
* <code>optional uint64 master_system_time = 4;</code>
*
* <pre>
* wall clock time from master
* </pre>
*/
public Builder clearMasterSystemTime() {
bitField0_ = (bitField0_ & ~0x00000008);
masterSystemTime_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:MergeRegionsRequest) // @@protoc_insertion_point(builder_scope:MergeRegionsRequest)
} }
@ -23406,53 +23528,53 @@ public final class AdminProtos {
"UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" + "UpdateInfo\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022" +
"\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" + "\"\n\rfavored_nodes\030\002 \003(\0132\013.ServerName\".\n\032U" +
"pdateFavoredNodesResponse\022\020\n\010response\030\001 " + "pdateFavoredNodesResponse\022\020\n\010response\030\001 " +
"\001(\r\"v\n\023MergeRegionsRequest\022\"\n\010region_a\030\001", "\001(\r\"\222\001\n\023MergeRegionsRequest\022\"\n\010region_a\030",
" \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(\013" + "\001 \002(\0132\020.RegionSpecifier\022\"\n\010region_b\030\002 \002(" +
"2\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005fa" + "\0132\020.RegionSpecifier\022\027\n\010forcible\030\003 \001(\010:\005f" +
"lse\"\026\n\024MergeRegionsResponse\"X\n\010WALEntry\022" + "alse\022\032\n\022master_system_time\030\004 \001(\004\"\026\n\024Merg" +
"\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key_value_bytes" + "eRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(" +
"\030\002 \003(\014\022\035\n\025associated_cell_count\030\003 \001(\005\"4\n" + "\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025a" +
"\030ReplicateWALEntryRequest\022\030\n\005entry\030\001 \003(\013" + "ssociated_cell_count\030\003 \001(\005\"4\n\030ReplicateW" +
"2\t.WALEntry\"\033\n\031ReplicateWALEntryResponse" + "ALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry" +
"\"\026\n\024RollWALWriterRequest\"0\n\025RollWALWrite" + "\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWAL" +
"rResponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021St" + "WriterRequest\"0\n\025RollWALWriterResponse\022\027" +
"opServerRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopS", "\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerReq",
"erverResponse\"\026\n\024GetServerInfoRequest\"B\n" + "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" +
"\nServerInfo\022 \n\013server_name\030\001 \002(\0132\013.Serve" + "se\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo" +
"rName\022\022\n\nwebui_port\030\002 \001(\r\"9\n\025GetServerIn" + "\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nwe" +
"foResponse\022 \n\013server_info\030\001 \002(\0132\013.Server" + "bui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022" +
"Info\"\034\n\032UpdateConfigurationRequest\"\035\n\033Up" + " \n\013server_info\030\001 \002(\0132\013.ServerInfo\"\034\n\032Upd" +
"dateConfigurationResponse2\325\010\n\014AdminServi" + "ateConfigurationRequest\"\035\n\033UpdateConfigu" +
"ce\022>\n\rGetRegionInfo\022\025.GetRegionInfoReque" + "rationResponse2\325\010\n\014AdminService\022>\n\rGetRe" +
"st\032\026.GetRegionInfoResponse\022;\n\014GetStoreFi" + "gionInfo\022\025.GetRegionInfoRequest\032\026.GetReg" +
"le\022\024.GetStoreFileRequest\032\025.GetStoreFileR" + "ionInfoResponse\022;\n\014GetStoreFile\022\024.GetSto" +
"esponse\022D\n\017GetOnlineRegion\022\027.GetOnlineRe", "reFileRequest\032\025.GetStoreFileResponse\022D\n\017",
"gionRequest\032\030.GetOnlineRegionResponse\0225\n" + "GetOnlineRegion\022\027.GetOnlineRegionRequest" +
"\nOpenRegion\022\022.OpenRegionRequest\032\023.OpenRe" + "\032\030.GetOnlineRegionResponse\0225\n\nOpenRegion" +
"gionResponse\022;\n\014WarmupRegion\022\024.WarmupReg" + "\022\022.OpenRegionRequest\032\023.OpenRegionRespons" +
"ionRequest\032\025.WarmupRegionResponse\0228\n\013Clo" + "e\022;\n\014WarmupRegion\022\024.WarmupRegionRequest\032" +
"seRegion\022\023.CloseRegionRequest\032\024.CloseReg" + "\025.WarmupRegionResponse\0228\n\013CloseRegion\022\023." +
"ionResponse\0228\n\013FlushRegion\022\023.FlushRegion" + "CloseRegionRequest\032\024.CloseRegionResponse" +
"Request\032\024.FlushRegionResponse\0228\n\013SplitRe" + "\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024.F" +
"gion\022\023.SplitRegionRequest\032\024.SplitRegionR" + "lushRegionResponse\0228\n\013SplitRegion\022\023.Spli" +
"esponse\022>\n\rCompactRegion\022\025.CompactRegion" + "tRegionRequest\032\024.SplitRegionResponse\022>\n\r" +
"Request\032\026.CompactRegionResponse\022;\n\014Merge", "CompactRegion\022\025.CompactRegionRequest\032\026.C",
"Regions\022\024.MergeRegionsRequest\032\025.MergeReg" + "ompactRegionResponse\022;\n\014MergeRegions\022\024.M" +
"ionsResponse\022J\n\021ReplicateWALEntry\022\031.Repl" + "ergeRegionsRequest\032\025.MergeRegionsRespons" +
"icateWALEntryRequest\032\032.ReplicateWALEntry" + "e\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEnt" +
"Response\022?\n\006Replay\022\031.ReplicateWALEntryRe" + "ryRequest\032\032.ReplicateWALEntryResponse\022?\n" +
"quest\032\032.ReplicateWALEntryResponse\022>\n\rRol" + "\006Replay\022\031.ReplicateWALEntryRequest\032\032.Rep" +
"lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + "licateWALEntryResponse\022>\n\rRollWALWriter\022" +
"WALWriterResponse\022>\n\rGetServerInfo\022\025.Get" + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" +
"ServerInfoRequest\032\026.GetServerInfoRespons" + "sponse\022>\n\rGetServerInfo\022\025.GetServerInfoR" +
"e\0225\n\nStopServer\022\022.StopServerRequest\032\023.St" + "equest\032\026.GetServerInfoResponse\0225\n\nStopSe" +
"opServerResponse\022M\n\022UpdateFavoredNodes\022\032", "rver\022\022.StopServerRequest\032\023.StopServerRes",
".UpdateFavoredNodesRequest\032\033.UpdateFavor" + "ponse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavo" +
"edNodesResponse\022P\n\023UpdateConfiguration\022\033" + "redNodesRequest\032\033.UpdateFavoredNodesResp" +
".UpdateConfigurationRequest\032\034.UpdateConf" + "onse\022P\n\023UpdateConfiguration\022\033.UpdateConf" +
"igurationResponseBA\n*org.apache.hadoop.h" + "igurationRequest\032\034.UpdateConfigurationRe" +
"base.protobuf.generatedB\013AdminProtosH\001\210\001" + "sponseBA\n*org.apache.hadoop.hbase.protob" +
"\001\240\001\001" "uf.generatedB\013AdminProtosH\001\210\001\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -23596,7 +23718,7 @@ public final class AdminProtos {
internal_static_MergeRegionsRequest_fieldAccessorTable = new internal_static_MergeRegionsRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MergeRegionsRequest_descriptor, internal_static_MergeRegionsRequest_descriptor,
new java.lang.String[] { "RegionA", "RegionB", "Forcible", }); new java.lang.String[] { "RegionA", "RegionB", "Forcible", "MasterSystemTime", });
internal_static_MergeRegionsResponse_descriptor = internal_static_MergeRegionsResponse_descriptor =
getDescriptor().getMessageTypes().get(21); getDescriptor().getMessageTypes().get(21);
internal_static_MergeRegionsResponse_fieldAccessorTable = new internal_static_MergeRegionsResponse_fieldAccessorTable = new

View File

@ -185,6 +185,8 @@ message MergeRegionsRequest {
required RegionSpecifier region_a = 1; required RegionSpecifier region_a = 1;
required RegionSpecifier region_b = 2; required RegionSpecifier region_b = 2;
optional bool forcible = 3 [default = false]; optional bool forcible = 3 [default = false];
// wall clock time from master
optional uint64 master_system_time = 4;
} }
message MergeRegionsResponse { message MergeRegionsResponse {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.util.MultiHConnection;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -256,6 +257,7 @@ public class RegionStateStore {
void mergeRegions(HRegionInfo p, void mergeRegions(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication); MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
EnvironmentEdgeManager.currentTime());
} }
} }

View File

@ -221,9 +221,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
} }
public synchronized void requestRegionsMerge(final Region a, public synchronized void requestRegionsMerge(final Region a,
final Region b, final boolean forcible) { final Region b, final boolean forcible, long masterSystemTime) {
try { try {
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible)); mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
+ forcible + ". " + this); + forcible + ". " + this);

View File

@ -1312,6 +1312,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Region regionA = getRegion(request.getRegionA()); Region regionA = getRegion(request.getRegionA());
Region regionB = getRegion(request.getRegionB()); Region regionB = getRegion(request.getRegionB());
boolean forcible = request.getForcible(); boolean forcible = request.getForcible();
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
regionA.startRegionOperation(Operation.MERGE_REGION); regionA.startRegionOperation(Operation.MERGE_REGION);
regionB.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION);
if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
@ -1332,7 +1333,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
long endTime = EnvironmentEdgeManager.currentTime(); long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
} }
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible); regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
masterSystemTime);
return MergeRegionsResponse.newBuilder().build(); return MergeRegionsResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) { } catch (DroppedSnapshotException ex) {
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);

View File

@ -42,13 +42,16 @@ class RegionMergeRequest implements Runnable {
private final HRegionServer server; private final HRegionServer server;
private final boolean forcible; private final boolean forcible;
private TableLock tableLock; private TableLock tableLock;
private final long masterSystemTime;
RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible) { RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible,
long masterSystemTime) {
Preconditions.checkNotNull(hrs); Preconditions.checkNotNull(hrs);
this.region_a = (HRegion)a; this.region_a = (HRegion)a;
this.region_b = (HRegion)b; this.region_b = (HRegion)b;
this.server = hrs; this.server = hrs;
this.forcible = forcible; this.forcible = forcible;
this.masterSystemTime = masterSystemTime;
} }
@Override @Override
@ -67,7 +70,7 @@ class RegionMergeRequest implements Runnable {
try { try {
final long startTime = EnvironmentEdgeManager.currentTime(); final long startTime = EnvironmentEdgeManager.currentTime();
RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a,
region_b, forcible); region_b, forcible, masterSystemTime);
//acquire a shared read lock on the table, so that table schema modifications //acquire a shared read lock on the table, so that table schema modifications
//do not happen concurrently //do not happen concurrently

View File

@ -62,6 +62,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
// We only merge adjacent regions if forcible is false // We only merge adjacent regions if forcible is false
private final boolean forcible; private final boolean forcible;
private boolean useCoordinationForAssignment; private boolean useCoordinationForAssignment;
private final long masterSystemTime;
/* /*
* Transaction state for listener, only valid during execute and * Transaction state for listener, only valid during execute and
@ -129,6 +130,17 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
*/ */
public RegionMergeTransactionImpl(final Region a, final Region b, public RegionMergeTransactionImpl(final Region a, final Region b,
final boolean forcible) { final boolean forcible) {
this(a, b, forcible, EnvironmentEdgeManager.currentTime());
}
/**
* Constructor
* @param a region a to merge
* @param b region b to merge
* @param forcible if false, we will only merge adjacent regions
* @param masterSystemTime the time at the master side
*/
public RegionMergeTransactionImpl(final Region a, final Region b,
final boolean forcible, long masterSystemTime) {
if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
this.region_a = (HRegion)a; this.region_a = (HRegion)a;
this.region_b = (HRegion)b; this.region_b = (HRegion)b;
@ -137,6 +149,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
this.region_b = (HRegion)a; this.region_b = (HRegion)a;
} }
this.forcible = forcible; this.forcible = forcible;
this.masterSystemTime = masterSystemTime;
this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
} }
@ -168,6 +181,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
* @return <code>true</code> if the regions are mergeable else * @return <code>true</code> if the regions are mergeable else
* <code>false</code> if they are not (e.g. its already closed, etc.). * <code>false</code> if they are not (e.g. its already closed, etc.).
*/ */
@Override
public boolean prepare(final RegionServerServices services) throws IOException { public boolean prepare(final RegionServerServices services) throws IOException {
if (!region_a.getTableDesc().getTableName() if (!region_a.getTableDesc().getTableName()
.equals(region_b.getTableDesc().getTableName())) { .equals(region_b.getTableDesc().getTableName())) {
@ -232,6 +246,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
* @throws IOException * @throws IOException
* @see #rollback(Server, RegionServerServices) * @see #rollback(Server, RegionServerServices)
*/ */
@Override
public HRegion execute(final Server server, public HRegion execute(final Server server,
final RegionServerServices services) throws IOException { final RegionServerServices services) throws IOException {
this.server = server; this.server = server;
@ -335,7 +350,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
if (metaEntries.isEmpty()) { if (metaEntries.isEmpty()) {
MetaTableAccessor.mergeRegions(server.getConnection(), MetaTableAccessor.mergeRegions(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
server.getServerName(), region_a.getTableDesc().getRegionReplication()); server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
} else { } else {
mergeRegionsAndPutMetaEntries(server.getConnection(), mergeRegionsAndPutMetaEntries(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
@ -355,7 +370,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
private void mergeRegionsAndPutMetaEntries(HConnection hConnection, private void mergeRegionsAndPutMetaEntries(HConnection hConnection,
HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB,
ServerName serverName, List<Mutation> metaEntries, ServerName serverName, List<Mutation> metaEntries,
int regionReplication) throws IOException { int regionReplication) throws IOException {
prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries, prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries,
regionReplication); regionReplication);
@ -367,16 +382,19 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
int regionReplication) throws IOException { int regionReplication) throws IOException {
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
// use the maximum of what master passed us vs local time.
long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
// Put for parent // Put for parent
Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged); Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time);
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
regionA.toByteArray()); regionA.toByteArray());
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
regionB.toByteArray()); regionB.toByteArray());
mutations.add(putOfMerged); mutations.add(putOfMerged);
// Deletes for merging regions // Deletes for merging regions
Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA); Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time);
Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB); Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time);
mutations.add(deleteA); mutations.add(deleteA);
mutations.add(deleteB); mutations.add(deleteB);
@ -663,6 +681,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
* of no return and so now need to abort the server to minimize * of no return and so now need to abort the server to minimize
* damage. * damage.
*/ */
@Override
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public boolean rollback(final Server server, public boolean rollback(final Server server,
final RegionServerServices services) throws IOException { final RegionServerServices services) throws IOException {

View File

@ -464,7 +464,8 @@ public class TestMetaTableAccessor {
List<HRegionInfo> regionInfos = Lists.newArrayList(parentA, parentB); List<HRegionInfo> regionInfos = Lists.newArrayList(parentA, parentB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3); MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
HConstants.LATEST_TIMESTAMP);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2); assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@ -473,6 +474,9 @@ public class TestMetaTableAccessor {
} }
} }
/**
* Tests whether maximum of masters system time versus RSs local system time is used
*/
@Test @Test
public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException { public void testMastersSystemTimeIsUsedInUpdateLocations() throws IOException {
long regionId = System.currentTimeMillis(); long regionId = System.currentTimeMillis();
@ -509,5 +513,57 @@ public class TestMetaTableAccessor {
meta.close(); meta.close();
} }
} }
@Test
public void testMastersSystemTimeIsUsedInMergeRegions() throws IOException {
long regionId = System.currentTimeMillis();
HRegionInfo regionInfoA = new HRegionInfo(TableName.valueOf("table_foo"),
HConstants.EMPTY_START_ROW, new byte[] {'a'}, false, regionId, 0);
HRegionInfo regionInfoB = new HRegionInfo(TableName.valueOf("table_foo"),
new byte[] {'a'}, HConstants.EMPTY_END_ROW, false, regionId, 0);
HRegionInfo mergedRegionInfo = new HRegionInfo(TableName.valueOf("table_foo"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0);
ServerName sn = ServerName.valueOf("bar", 0, 0);
Table meta = MetaTableAccessor.getMetaHTable(connection);
try {
List<HRegionInfo> regionInfos = Lists.newArrayList(regionInfoA, regionInfoB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
// write the serverName column with a big current time, but set the masters time as even
// bigger. When region merge deletes the rows for regionA and regionB, the serverName columns
// should not be seen by the following get
long serverNameTime = EnvironmentEdgeManager.currentTime() + 100000000;
long masterSystemTime = EnvironmentEdgeManager.currentTime() + 123456789;
// write the serverName columns
MetaTableAccessor.updateRegionLocation(connection, regionInfoA, sn, 1, serverNameTime);
// assert that we have the serverName column with expected ts
Get get = new Get(mergedRegionInfo.getRegionName());
Result result = meta.get(get);
Cell serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getServerColumn(0));
assertNotNull(serverCell);
assertEquals(serverNameTime, serverCell.getTimestamp());
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
regionInfoA, regionInfoB, sn, 1, masterSystemTime);
result = meta.get(get);
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getServerColumn(0));
Cell startCodeCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getStartCodeColumn(0));
Cell seqNumCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
MetaTableAccessor.getSeqNumColumn(0));
assertNull(serverCell);
assertNull(startCodeCell);
assertNull(seqNumCell);
} finally {
meta.close();
}
}
} }