HBASE-12986 Compaction pressure based client pushback

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
chenheng 2015-10-29 11:54:21 +08:00 committed by Andrew Purtell
parent e4bf77e2de
commit 6871724104
6 changed files with 200 additions and 37 deletions

View File

@ -70,6 +70,9 @@ public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy {
// Factor in heap occupancy // Factor in heap occupancy
float heapOccupancy = regionStats.getHeapOccupancyPercent() / 100.0f; float heapOccupancy = regionStats.getHeapOccupancyPercent() / 100.0f;
// Factor in compaction pressure, 1.0 means heavy compaction pressure
float compactionPressure = regionStats.getCompactionPressure() / 100.0f;
if (heapOccupancy >= heapOccupancyLowWatermark) { if (heapOccupancy >= heapOccupancyLowWatermark) {
// If we are higher than the high watermark, we are already applying max // If we are higher than the high watermark, we are already applying max
// backoff and cannot scale more (see scale() below) // backoff and cannot scale more (see scale() below)
@ -80,7 +83,7 @@ public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy {
scale(heapOccupancy, heapOccupancyLowWatermark, heapOccupancyHighWatermark, scale(heapOccupancy, heapOccupancyLowWatermark, heapOccupancyHighWatermark,
0.1, 1.0)); 0.1, 1.0));
} }
percent = Math.max(percent, compactionPressure);
// square the percent as a value less than 1. Closer we move to 100 percent, // square the percent as a value less than 1. Closer we move to 100 percent,
// the percent moves to 1, but squaring causes the exponential curve // the percent moves to 1, but squaring causes the exponential curve
double multiplier = Math.pow(percent, 4.0); double multiplier = Math.pow(percent, 4.0);

View File

@ -57,10 +57,12 @@ public class ServerStatistics {
public static class RegionStatistics { public static class RegionStatistics {
private int memstoreLoad = 0; private int memstoreLoad = 0;
private int heapOccupancy = 0; private int heapOccupancy = 0;
private int compactionPressure = 0;
public void update(ClientProtos.RegionLoadStats currentStats) { public void update(ClientProtos.RegionLoadStats currentStats) {
this.memstoreLoad = currentStats.getMemstoreLoad(); this.memstoreLoad = currentStats.getMemstoreLoad();
this.heapOccupancy = currentStats.getHeapOccupancy(); this.heapOccupancy = currentStats.getHeapOccupancy();
this.compactionPressure = currentStats.getCompactionPressure();
} }
public int getMemstoreLoadPercent(){ public int getMemstoreLoadPercent(){
@ -70,5 +72,10 @@ public class ServerStatistics {
public int getHeapOccupancyPercent(){ public int getHeapOccupancyPercent(){
return this.heapOccupancy; return this.heapOccupancy;
} }
public int getCompactionPressure() {
return compactionPressure;
}
} }
} }

View File

@ -113,22 +113,46 @@ public class TestClientExponentialBackoff {
ServerStatistics stats = new ServerStatistics(); ServerStatistics stats = new ServerStatistics();
long backoffTime; long backoffTime;
update(stats, 0, 95); update(stats, 0, 95, 0);
backoffTime = backoff.getBackoffTime(server, regionname, stats); backoffTime = backoff.getBackoffTime(server, regionname, stats);
assertTrue("Heap occupancy at low watermark had no effect", backoffTime > 0); assertTrue("Heap occupancy at low watermark had no effect", backoffTime > 0);
long previous = backoffTime; long previous = backoffTime;
update(stats, 0, 96); update(stats, 0, 96, 0);
backoffTime = backoff.getBackoffTime(server, regionname, stats); backoffTime = backoff.getBackoffTime(server, regionname, stats);
assertTrue("Increase above low watermark should have increased backoff", assertTrue("Increase above low watermark should have increased backoff",
backoffTime > previous); backoffTime > previous);
update(stats, 0, 98); update(stats, 0, 98, 0);
backoffTime = backoff.getBackoffTime(server, regionname, stats); backoffTime = backoff.getBackoffTime(server, regionname, stats);
assertEquals("We should be using max backoff when at high watermark", backoffTime, assertEquals("We should be using max backoff when at high watermark", backoffTime,
ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF);
} }
@Test
public void testCompactionPressurePolicy() {
Configuration conf = new Configuration(false);
ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf);
ServerStatistics stats = new ServerStatistics();
long backoffTime;
update(stats, 0, 0, 0);
backoffTime = backoff.getBackoffTime(server, regionname, stats);
assertTrue("Compaction pressure has no effect", backoffTime == 0);
long previous = backoffTime;
update(stats, 0, 0, 50);
backoffTime = backoff.getBackoffTime(server, regionname, stats);
assertTrue("Compaction pressure should be bigger",
backoffTime > previous);
update(stats, 0, 0, 100);
backoffTime = backoff.getBackoffTime(server, regionname, stats);
assertEquals("under heavy compaction pressure", backoffTime,
ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF);
}
private void update(ServerStatistics stats, int load) { private void update(ServerStatistics stats, int load) {
ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder()
.setMemstoreLoad .setMemstoreLoad
@ -136,10 +160,12 @@ public class TestClientExponentialBackoff {
stats.update(regionname, stat); stats.update(regionname, stat);
} }
private void update(ServerStatistics stats, int memstoreLoad, int heapOccupancy) { private void update(ServerStatistics stats, int memstoreLoad, int heapOccupancy,
int compactionPressure) {
ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder()
.setMemstoreLoad(memstoreLoad) .setMemstoreLoad(memstoreLoad)
.setHeapOccupancy(heapOccupancy) .setHeapOccupancy(heapOccupancy)
.setCompactionPressure(compactionPressure)
.build(); .build();
stats.update(regionname, stat); stats.update(regionname, stat);
} }

View File

@ -27489,6 +27489,24 @@ public final class ClientProtos {
* </pre> * </pre>
*/ */
int getHeapOccupancy(); int getHeapOccupancy();
// optional int32 compactionPressure = 3 [default = 0];
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
boolean hasCompactionPressure();
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
int getCompactionPressure();
} }
/** /**
* Protobuf type {@code hbase.pb.RegionLoadStats} * Protobuf type {@code hbase.pb.RegionLoadStats}
@ -27556,6 +27574,11 @@ public final class ClientProtos {
heapOccupancy_ = input.readInt32(); heapOccupancy_ = input.readInt32();
break; break;
} }
case 24: {
bitField0_ |= 0x00000004;
compactionPressure_ = input.readInt32();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -27646,9 +27669,34 @@ public final class ClientProtos {
return heapOccupancy_; return heapOccupancy_;
} }
// optional int32 compactionPressure = 3 [default = 0];
public static final int COMPACTIONPRESSURE_FIELD_NUMBER = 3;
private int compactionPressure_;
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
public boolean hasCompactionPressure() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
public int getCompactionPressure() {
return compactionPressure_;
}
private void initFields() { private void initFields() {
memstoreLoad_ = 0; memstoreLoad_ = 0;
heapOccupancy_ = 0; heapOccupancy_ = 0;
compactionPressure_ = 0;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -27668,6 +27716,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000002) == 0x00000002)) { if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeInt32(2, heapOccupancy_); output.writeInt32(2, heapOccupancy_);
} }
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt32(3, compactionPressure_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -27685,6 +27736,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeInt32Size(2, heapOccupancy_); .computeInt32Size(2, heapOccupancy_);
} }
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(3, compactionPressure_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -27718,6 +27773,11 @@ public final class ClientProtos {
result = result && (getHeapOccupancy() result = result && (getHeapOccupancy()
== other.getHeapOccupancy()); == other.getHeapOccupancy());
} }
result = result && (hasCompactionPressure() == other.hasCompactionPressure());
if (hasCompactionPressure()) {
result = result && (getCompactionPressure()
== other.getCompactionPressure());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -27739,6 +27799,10 @@ public final class ClientProtos {
hash = (37 * hash) + HEAPOCCUPANCY_FIELD_NUMBER; hash = (37 * hash) + HEAPOCCUPANCY_FIELD_NUMBER;
hash = (53 * hash) + getHeapOccupancy(); hash = (53 * hash) + getHeapOccupancy();
} }
if (hasCompactionPressure()) {
hash = (37 * hash) + COMPACTIONPRESSURE_FIELD_NUMBER;
hash = (53 * hash) + getCompactionPressure();
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -27857,6 +27921,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000001); bitField0_ = (bitField0_ & ~0x00000001);
heapOccupancy_ = 0; heapOccupancy_ = 0;
bitField0_ = (bitField0_ & ~0x00000002); bitField0_ = (bitField0_ & ~0x00000002);
compactionPressure_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
return this; return this;
} }
@ -27893,6 +27959,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000002; to_bitField0_ |= 0x00000002;
} }
result.heapOccupancy_ = heapOccupancy_; result.heapOccupancy_ = heapOccupancy_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.compactionPressure_ = compactionPressure_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -27915,6 +27985,9 @@ public final class ClientProtos {
if (other.hasHeapOccupancy()) { if (other.hasHeapOccupancy()) {
setHeapOccupancy(other.getHeapOccupancy()); setHeapOccupancy(other.getHeapOccupancy());
} }
if (other.hasCompactionPressure()) {
setCompactionPressure(other.getCompactionPressure());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -28044,6 +28117,55 @@ public final class ClientProtos {
return this; return this;
} }
// optional int32 compactionPressure = 3 [default = 0];
private int compactionPressure_ ;
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
public boolean hasCompactionPressure() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
public int getCompactionPressure() {
return compactionPressure_;
}
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
public Builder setCompactionPressure(int value) {
bitField0_ |= 0x00000004;
compactionPressure_ = value;
onChanged();
return this;
}
/**
* <code>optional int32 compactionPressure = 3 [default = 0];</code>
*
* <pre>
* Compaction pressure. Guaranteed to be positive, between 0 and 100.
* </pre>
*/
public Builder clearCompactionPressure() {
bitField0_ = (bitField0_ & ~0x00000004);
compactionPressure_ = 0;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:hbase.pb.RegionLoadStats) // @@protoc_insertion_point(builder_scope:hbase.pb.RegionLoadStats)
} }
@ -33308,38 +33430,39 @@ public final class ClientProtos {
"oprocessorServiceCall\"k\n\014RegionAction\022)\n", "oprocessorServiceCall\"k\n\014RegionAction\022)\n",
"\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" + "\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" +
"\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" + "\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" +
".pb.Action\"D\n\017RegionLoadStats\022\027\n\014memstor" + ".pb.Action\"c\n\017RegionLoadStats\022\027\n\014memstor" +
"eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" + "eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" +
"\"\332\001\n\021ResultOrException\022\r\n\005index\030\001 \001(\r\022 \n" + "\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"\332\001\n\021Resu" +
"\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\texcep" + "ltOrException\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002" +
"tion\030\003 \001(\0132\027.hbase.pb.NameBytesPair\022:\n\016s" + " \001(\0132\020.hbase.pb.Result\022*\n\texception\030\003 \001(" +
"ervice_result\030\004 \001(\0132\".hbase.pb.Coprocess" + "\0132\027.hbase.pb.NameBytesPair\022:\n\016service_re" +
"orServiceResult\022,\n\tloadStats\030\005 \001(\0132\031.hba" + "sult\030\004 \001(\0132\".hbase.pb.CoprocessorService" +
"se.pb.RegionLoadStats\"x\n\022RegionActionRes", "Result\022,\n\tloadStats\030\005 \001(\0132\031.hbase.pb.Reg",
"ult\0226\n\021resultOrException\030\001 \003(\0132\033.hbase.p" + "ionLoadStats\"x\n\022RegionActionResult\0226\n\021re" +
"b.ResultOrException\022*\n\texception\030\002 \001(\0132\027" + "sultOrException\030\001 \003(\0132\033.hbase.pb.ResultO" +
".hbase.pb.NameBytesPair\"x\n\014MultiRequest\022" + "rException\022*\n\texception\030\002 \001(\0132\027.hbase.pb" +
",\n\014regionAction\030\001 \003(\0132\026.hbase.pb.RegionA" + ".NameBytesPair\"x\n\014MultiRequest\022,\n\014region" +
"ction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003" + "Action\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022\n\n" +
" \001(\0132\023.hbase.pb.Condition\"\\\n\rMultiRespon" + "nonceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hb" +
"se\0228\n\022regionActionResult\030\001 \003(\0132\034.hbase.p" + "ase.pb.Condition\"\\\n\rMultiResponse\0228\n\022reg" +
"b.RegionActionResult\022\021\n\tprocessed\030\002 \001(\010*" + "ionActionResult\030\001 \003(\0132\034.hbase.pb.RegionA" +
"\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" + "ctionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consis" +
"2\203\004\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get", "tency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClie",
"Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" + "ntService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025" +
"\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" + ".hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase." +
"ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque" + "pb.MutateRequest\032\030.hbase.pb.MutateRespon" +
"st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" + "se\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbas" +
"ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" + "e.pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hba" +
"ase.pb.BulkLoadHFileResponse\022X\n\013ExecServ" + "se.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bu" +
"ice\022#.hbase.pb.CoprocessorServiceRequest" + "lkLoadHFileResponse\022X\n\013ExecService\022#.hba" +
"\032$.hbase.pb.CoprocessorServiceResponse\022d" + "se.pb.CoprocessorServiceRequest\032$.hbase." +
"\n\027ExecRegionServerService\022#.hbase.pb.Cop" + "pb.CoprocessorServiceResponse\022d\n\027ExecReg" +
"rocessorServiceRequest\032$.hbase.pb.Coproc", "ionServerService\022#.hbase.pb.CoprocessorS",
"essorServiceResponse\0228\n\005Multi\022\026.hbase.pb" + "erviceRequest\032$.hbase.pb.CoprocessorServ" +
".MultiRequest\032\027.hbase.pb.MultiResponseBB" + "iceResponse\0228\n\005Multi\022\026.hbase.pb.MultiReq" +
"\n*org.apache.hadoop.hbase.protobuf.gener" + "uest\032\027.hbase.pb.MultiResponseBB\n*org.apa" +
"atedB\014ClientProtosH\001\210\001\001\240\001\001" "che.hadoop.hbase.protobuf.generatedB\014Cli" +
"entProtosH\001\210\001\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -33501,7 +33624,7 @@ public final class ClientProtos {
internal_static_hbase_pb_RegionLoadStats_fieldAccessorTable = new internal_static_hbase_pb_RegionLoadStats_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_RegionLoadStats_descriptor, internal_static_hbase_pb_RegionLoadStats_descriptor,
new java.lang.String[] { "MemstoreLoad", "HeapOccupancy", }); new java.lang.String[] { "MemstoreLoad", "HeapOccupancy", "CompactionPressure", });
internal_static_hbase_pb_ResultOrException_descriptor = internal_static_hbase_pb_ResultOrException_descriptor =
getDescriptor().getMessageTypes().get(23); getDescriptor().getMessageTypes().get(23);
internal_static_hbase_pb_ResultOrException_fieldAccessorTable = new internal_static_hbase_pb_ResultOrException_fieldAccessorTable = new

View File

@ -395,6 +395,8 @@ message RegionLoadStats {
// Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. // Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100.
// We can move this to "ServerLoadStats" should we develop them. // We can move this to "ServerLoadStats" should we develop them.
optional int32 heapOccupancy = 2 [default = 0]; optional int32 heapOccupancy = 2 [default = 0];
// Compaction pressure. Guaranteed to be positive, between 0 and 100.
optional int32 compactionPressure = 3 [default = 0];
} }
/** /**

View File

@ -6706,6 +6706,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
.memstoreFlushSize))); .memstoreFlushSize)));
stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100); stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 :
(int)rsServices.getCompactionPressure()*100);
return stats.build(); return stats.build();
} }