HBASE-9330 Refactor PE to create HTable the correct way

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518341 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2013-08-28 19:46:55 +00:00
parent 9ebaea9f54
commit 89df414955
1 changed files with 25 additions and 9 deletions

View File

@ -44,7 +44,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
@ -131,6 +134,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private boolean writeToWAL = true; private boolean writeToWAL = true;
private boolean inMemoryCF = false; private boolean inMemoryCF = false;
private int presplitRegions = 0; private int presplitRegions = 0;
private HConnection connection;
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
/** /**
@ -460,7 +464,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
value.getRows(), value.getTotalRows(), value.getRows(), value.getTotalRows(),
value.isFlushCommits(), value.isWriteToWAL(), value.isFlushCommits(), value.isWriteToWAL(),
status); HConnectionManager.createConnection(context.getConfiguration()), status);
// Collect how much time the thing took. Report as map output and // Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter. // to the ELAPSED_TIME counter.
context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
@ -567,6 +571,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
final Compression.Algorithm compression = this.compression; final Compression.Algorithm compression = this.compression;
final boolean writeToWal = this.writeToWAL; final boolean writeToWal = this.writeToWAL;
final int preSplitRegions = this.presplitRegions; final int preSplitRegions = this.presplitRegions;
final HConnection connection = HConnectionManager.createConnection(getConf());
for (int i = 0; i < this.N; i++) { for (int i = 0; i < this.N; i++) {
final int index = i; final int index = i;
Thread t = new Thread ("TestClient-" + i) { Thread t = new Thread ("TestClient-" + i) {
@ -581,10 +586,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
pe.writeToWAL = writeToWal; pe.writeToWAL = writeToWal;
pe.presplitRegions = preSplitRegions; pe.presplitRegions = preSplitRegions;
pe.N = N; pe.N = N;
pe.connection = connection;
try { try {
long elapsedTime = pe.runOneClient(cmd, index * perClientRows, long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
perClientRows, R, perClientRows, R,
flushCommits, writeToWAL, new Status() { flushCommits, writeToWAL, connection, new Status() {
public void setStatus(final String msg) throws IOException { public void setStatus(final String msg) throws IOException {
LOG.info("client-" + getName() + " " + msg); LOG.info("client-" + getName() + " " + msg);
} }
@ -747,13 +753,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
private TableName tableName; private TableName tableName;
private boolean flushCommits; private boolean flushCommits;
private boolean writeToWAL = true; private boolean writeToWAL = true;
private HConnection connection;
TestOptions() { TestOptions() {
} }
TestOptions(int startRow, int perClientRunRows, int totalRows, TestOptions(int startRow, int perClientRunRows, int totalRows,
int numClientThreads, TableName tableName, int numClientThreads, TableName tableName,
boolean flushCommits, boolean writeToWAL) { boolean flushCommits, boolean writeToWAL, HConnection connection) {
this.startRow = startRow; this.startRow = startRow;
this.perClientRunRows = perClientRunRows; this.perClientRunRows = perClientRunRows;
this.totalRows = totalRows; this.totalRows = totalRows;
@ -761,6 +768,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.tableName = tableName; this.tableName = tableName;
this.flushCommits = flushCommits; this.flushCommits = flushCommits;
this.writeToWAL = writeToWAL; this.writeToWAL = writeToWAL;
this.connection = connection;
} }
public int getStartRow() { public int getStartRow() {
@ -790,6 +798,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
public boolean isWriteToWAL() { public boolean isWriteToWAL() {
return writeToWAL; return writeToWAL;
} }
public HConnection getConnection() {
return connection;
}
} }
/* /*
@ -811,10 +823,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected final int totalRows; protected final int totalRows;
private final Status status; private final Status status;
protected TableName tableName; protected TableName tableName;
protected HTable table; protected HTableInterface table;
protected volatile Configuration conf; protected volatile Configuration conf;
protected boolean flushCommits; protected boolean flushCommits;
protected boolean writeToWAL; protected boolean writeToWAL;
protected HConnection connection;
/** /**
* Note that all subclasses of this class must provide a public contructor * Note that all subclasses of this class must provide a public contructor
@ -831,6 +844,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.conf = conf; this.conf = conf;
this.flushCommits = options.isFlushCommits(); this.flushCommits = options.isFlushCommits();
this.writeToWAL = options.isWriteToWAL(); this.writeToWAL = options.isWriteToWAL();
this.connection = options.getConnection();
} }
private String generateStatus(final int sr, final int i, final int lr) { private String generateStatus(final int sr, final int i, final int lr) {
@ -843,9 +857,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
void testSetup() throws IOException { void testSetup() throws IOException {
this.table = new HTable(conf, tableName); this.table = connection.getTable(tableName);
this.table.setAutoFlush(false); this.table.setAutoFlush(false);
this.table.setScannerCaching(30);
} }
void testTakedown() throws IOException { void testTakedown() throws IOException {
@ -1063,6 +1076,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException { void testRow(final int i) throws IOException {
if (this.testScanner == null) { if (this.testScanner == null) {
Scan scan = new Scan(format(this.startRow)); Scan scan = new Scan(format(this.startRow));
scan.setCaching(30);
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
this.testScanner = table.getScanner(scan); this.testScanner = table.getScanner(scan);
} }
@ -1184,7 +1198,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long runOneClient(final Class<? extends Test> cmd, final int startRow, long runOneClient(final Class<? extends Test> cmd, final int startRow,
final int perClientRunRows, final int totalRows, final int perClientRunRows, final int totalRows,
boolean flushCommits, boolean writeToWAL, boolean flushCommits, boolean writeToWAL, HConnection connection,
final Status status) final Status status)
throws IOException { throws IOException {
status.setStatus("Start " + cmd + " at offset " + startRow + " for " + status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
@ -1192,7 +1206,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long totalElapsedTime = 0; long totalElapsedTime = 0;
TestOptions options = new TestOptions(startRow, perClientRunRows, TestOptions options = new TestOptions(startRow, perClientRunRows,
totalRows, N, tableName, flushCommits, writeToWAL); totalRows, N, tableName, flushCommits, writeToWAL, connection);
final Test t; final Test t;
try { try {
Constructor<? extends Test> constructor = cmd.getDeclaredConstructor( Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
@ -1224,7 +1238,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
try { try {
admin = new HBaseAdmin(getConf()); admin = new HBaseAdmin(getConf());
checkTable(admin); checkTable(admin);
runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, this.connection,
status); status);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed", e); LOG.error("Failed", e);
@ -1408,6 +1422,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue; continue;
} }
this.connection = HConnectionManager.createConnection(getConf());
Class<? extends Test> cmdClass = determineCommandClass(cmd); Class<? extends Test> cmdClass = determineCommandClass(cmd);
if (cmdClass != null) { if (cmdClass != null) {
getArgs(i + 1, args); getArgs(i + 1, args);