HBASE-20523 PE tool should support configuring client side buffering sizes

(Ram)
This commit is contained in:
Vasudevan 2018-05-07 12:50:24 +05:30
parent 2373451f94
commit db67906cd0
2 changed files with 32 additions and 1 deletions

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Consistency;
@ -666,6 +667,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
boolean asyncPrefetch = false; boolean asyncPrefetch = false;
boolean cacheBlocks = true; boolean cacheBlocks = true;
Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
long bufferSize = 2l * 1024l * 1024l;
public TestOptions() {} public TestOptions() {}
@ -715,6 +717,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.asyncPrefetch = that.asyncPrefetch; this.asyncPrefetch = that.asyncPrefetch;
this.cacheBlocks = that.cacheBlocks; this.cacheBlocks = that.cacheBlocks;
this.scanReadType = that.scanReadType; this.scanReadType = that.scanReadType;
this.bufferSize = that.bufferSize;
} }
public int getCaching() { public int getCaching() {
@ -885,6 +888,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.valueSize = valueSize; this.valueSize = valueSize;
} }
public void setBufferSize(long bufferSize) {
this.bufferSize = bufferSize;
}
public void setPeriod(int period) { public void setPeriod(int period) {
this.period = period; this.period = period;
} }
@ -1020,6 +1027,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
public MemoryCompactionPolicy getInMemoryCompaction() { public MemoryCompactionPolicy getInMemoryCompaction() {
return this.inMemoryCompaction; return this.inMemoryCompaction;
} }
public long getBufferSize() {
return this.bufferSize;
}
} }
/* /*
@ -1626,7 +1637,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override @Override
void onStartup() throws IOException { void onStartup() throws IOException {
this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
p.writeBufferSize(opts.bufferSize);
this.mutator = connection.getBufferedMutator(p);
this.table = connection.getTable(TableName.valueOf(opts.tableName)); this.table = connection.getTable(TableName.valueOf(opts.tableName));
} }
@ -2363,6 +2376,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); System.err.println(" asyncPrefetch Enable asyncPrefetch for scan");
System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true");
System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default"); System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default");
System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB");
System.err.println(); System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: "); System.err.println(" For example: ");
@ -2636,6 +2650,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
continue; continue;
} }
final String bufferSize = "--bufferSize=";
if (cmd.startsWith(bufferSize)) {
opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
continue;
}
if (isCommandClass(cmd)) { if (isCommandClass(cmd)) {
opts.cmdName = cmd; opts.cmdName = cmd;
try { try {

View File

@ -178,6 +178,16 @@ public class TestPerformanceEvaluation {
assertTrue(median != 0 && median != 1 && median != valueSize); assertTrue(median != 0 && median != 1 && median != valueSize);
} }
@Test
public void testSetBufferSizeOption() {
TestOptions opts = new PerformanceEvaluation.TestOptions();
long bufferSize = opts.getBufferSize();
assertEquals(bufferSize, 2l * 1024l * 1024l);
opts.setBufferSize(64l * 1024l);
bufferSize = opts.getBufferSize();
assertEquals(bufferSize, 64l * 1024l);
}
@Test @Test
public void testParseOptsWithThreads() { public void testParseOptsWithThreads() {
Queue<String> opts = new LinkedList<>(); Queue<String> opts = new LinkedList<>();