HBASE-10810 LoadTestTool should share the connection and connection pool

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1595078 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2014-05-15 23:48:32 +00:00
parent e50811a7ab
commit 25baace0de
10 changed files with 80 additions and 62 deletions

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.util.LoadTestTool;
@ -236,7 +236,7 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr
protected Thread timeoutThread;
public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double verifyPercent) {
TableName tableName, double verifyPercent) throws IOException {
super(dataGen, conf, tableName, verifyPercent);
long timeoutMs = conf.getLong(
String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
@ -324,7 +324,7 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr
@Override
protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
Result[] results, HTable table, boolean isNullExpected)
Result[] results, HTableInterface table, boolean isNullExpected)
throws IOException {
super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected);
// we actually do not timeout and cancel the reads after timeout. We just wait for the RPC

View File

@ -34,6 +34,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@ -53,6 +55,7 @@ public abstract class MultiThreadedAction {
protected final TableName tableName;
protected final Configuration conf;
protected final HConnection connection; // all reader / writer threads will share this connection
protected int numThreads = 1;
@ -142,11 +145,12 @@ public abstract class MultiThreadedAction {
public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName,
String actionLetter) {
String actionLetter) throws IOException {
this.conf = conf;
this.dataGenerator = dataGen;
this.tableName = tableName;
this.actionLetter = actionLetter;
this.connection = HConnectionManager.createConnection(conf);
}
public void start(long startKey, long endKey, int numThreads) throws IOException {
@ -243,10 +247,21 @@ public abstract class MultiThreadedAction {
}
}
public void close() {
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
LOG.warn("Could not close the connection: " + ex);
}
}
}
public void waitForFinish() {
while (numThreadsWorking.get() != 0) {
Threads.sleepWithoutInterrupt(1000);
}
close();
}
public boolean isDone() {

View File

@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
@ -79,7 +80,7 @@ public class MultiThreadedReader extends MultiThreadedAction
private int batchSize = DEFAULT_BATCH_SIZE;
public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double verifyPercent) {
TableName tableName, double verifyPercent) throws IOException {
super(dataGen, conf, tableName, "R");
this.verifyPercent = verifyPercent;
}
@ -127,7 +128,7 @@ public class MultiThreadedReader extends MultiThreadedAction
public class HBaseReaderThread extends Thread {
protected final int readerId;
protected final HTable table;
protected final HTableInterface table;
/** The "current" key being read. Increases from startKey to endKey. */
private long curKey;
@ -150,8 +151,8 @@ public class MultiThreadedReader extends MultiThreadedAction
setName(getClass().getSimpleName() + "_" + readerId);
}
protected HTable createTable() throws IOException {
return new HTable(conf, tableName);
protected HTableInterface createTable() throws IOException {
return connection.getTable(tableName);
}
@Override
@ -342,7 +343,7 @@ public class MultiThreadedReader extends MultiThreadedAction
}
protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
Result[] results, HTable table, boolean isNullExpected)
Result[] results, HTableInterface table, boolean isNullExpected)
throws IOException {
totalOpTimeMs.addAndGet(elapsedNano / 1000000);
numKeys.addAndGet(gets.length);
@ -354,23 +355,23 @@ public class MultiThreadedReader extends MultiThreadedAction
}
protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
Result result, HTable table, boolean isNullExpected)
Result result, HTableInterface table, boolean isNullExpected)
throws IOException {
verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
new Result[]{result}, table, isNullExpected);
}
private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
Result result, HTable table, boolean isNullExpected) throws IOException {
Result result, HTableInterface table, boolean isNullExpected) throws IOException {
if (!result.isEmpty()) {
if (verify) {
numKeysVerified.incrementAndGet();
}
} else {
HRegionLocation hloc = table.getRegionLocation(get.getRow());
HRegionLocation hloc = connection.getRegionLocation(tableName,
get.getRow(), false);
String rowKey = Bytes.toString(get.getRow());
LOG.info("Key = " + rowKey + ", RegionServer: "
+ hloc.getHostname());
LOG.info("Key = " + rowKey + ", Region location: " + hloc);
if(isNullExpected) {
nullResult.incrementAndGet();
LOG.debug("Null result obtained for the key ="+rowKey);

View File

@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
@ -43,12 +43,12 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader {
* Maps user with Table instance. Because the table instance has to be created
* per user inorder to work in that user's context
*/
private Map<String, HTable> userVsTable = new HashMap<String, HTable>();
private Map<String, HTableInterface> userVsTable = new HashMap<String, HTableInterface>();
private Map<String, User> users = new HashMap<String, User>();
private String[] userNames;
public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double verifyPercent, String userNames) {
TableName tableName, double verifyPercent, String userNames) throws IOException {
super(dataGen, conf, tableName, verifyPercent);
this.userNames = userNames.split(COMMA);
}
@ -68,13 +68,13 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader {
}
@Override
protected HTable createTable() throws IOException {
protected HTableInterface createTable() throws IOException {
return null;
}
@Override
protected void closeTable() {
for (HTable table : userVsTable.values()) {
for (HTableInterface table : userVsTable.values()) {
try {
table.close();
} catch (Exception e) {
@ -93,14 +93,14 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader {
PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
HTable localTable = null;
HTableInterface localTable = null;
try {
get.setACLStrategy(true);
Result result = null;
int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
int mod = ((int) keyToRead % userNames.length);
if (userVsTable.get(userNames[mod]) == null) {
localTable = new HTable(conf, tableName);
localTable = connection.getTable(tableName);
userVsTable.put(userNames[mod], localTable);
result = localTable.get(get);
} else {

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@ -65,7 +65,7 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
private final double updatePercent;
public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double updatePercent) {
TableName tableName, double updatePercent) throws IOException {
super(dataGen, conf, tableName, "U");
this.updatePercent = updatePercent;
}
@ -122,17 +122,18 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
}
protected class HBaseUpdaterThread extends Thread {
protected final HTable table;
protected final HTableInterface table;
public HBaseUpdaterThread(int updaterId) throws IOException {
setName(getClass().getSimpleName() + "_" + updaterId);
table = createTable();
}
protected HTable createTable() throws IOException {
return new HTable(conf, tableName);
protected HTableInterface createTable() throws IOException {
return connection.getTable(tableName);
}
@Override
public void run() {
try {
long rowKeyBase;
@ -273,11 +274,11 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
return result;
}
public void mutate(HTable table, Mutation m, long keyBase) {
public void mutate(HTableInterface table, Mutation m, long keyBase) {
mutate(table, m, keyBase, null, null, null, null);
}
public void mutate(HTable table, Mutation m,
public void mutate(HTableInterface table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
@ -326,11 +327,11 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
}
}
public void mutate(HTable table, Mutation m, long keyBase) {
public void mutate(HTableInterface table, Mutation m, long keyBase) {
mutate(table, m, keyBase, null, null, null, null);
}
public void mutate(HTable table, Mutation m,
public void mutate(HTableInterface table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@ -53,12 +53,13 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
* Maps user with Table instance. Because the table instance has to be created
* per user inorder to work in that user's context
*/
private Map<String, HTable> userVsTable = new HashMap<String, HTable>();
private Map<String, HTableInterface> userVsTable = new HashMap<String, HTableInterface>();
private Map<String, User> users = new HashMap<String, User>();
private String[] userNames;
public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double updatePercent, User userOwner, String userNames) {
TableName tableName, double updatePercent, User userOwner, String userNames)
throws IOException {
super(dataGen, conf, tableName, updatePercent);
this.userOwner = userOwner;
this.userNames = userNames.split(COMMA);
@ -74,7 +75,7 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread {
private HTable table;
private HTableInterface table;
private MutateAccessAction mutateAction = new MutateAccessAction();
public HBaseUpdaterThreadWithACL(int updaterId) throws IOException {
@ -82,7 +83,7 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
}
@Override
protected HTable createTable() throws IOException {
protected HTableInterface createTable() throws IOException {
return null;
}
@ -92,7 +93,7 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
if (table != null) {
table.close();
}
for (HTable table : userVsTable.values()) {
for (HTableInterface table : userVsTable.values()) {
try {
table.close();
} catch (Exception e) {
@ -111,11 +112,11 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
@Override
public Object run() throws Exception {
Result res = null;
HTable localTable = null;
HTableInterface localTable = null;
try {
int mod = ((int) rowKeyBase % userNames.length);
if (userVsTable.get(userNames[mod]) == null) {
localTable = new HTable(conf, tableName);
localTable = connection.getTable(tableName);
userVsTable.put(userNames[mod], localTable);
res = localTable.get(get);
} else {
@ -158,7 +159,7 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
}
@Override
public void mutate(final HTable table, Mutation m, final long keyBase, final byte[] row,
public void mutate(final HTableInterface table, Mutation m, final long keyBase, final byte[] row,
final byte[] cf, final byte[] q, final byte[] v) {
final long start = System.currentTimeMillis();
try {
@ -179,7 +180,7 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
}
class MutateAccessAction implements PrivilegedExceptionAction<Object> {
private HTable table;
private HTableInterface table;
private long start;
private Mutation m;
private long keyBase;
@ -224,7 +225,7 @@ public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
public Object run() throws Exception {
try {
if (table == null) {
table = new HTable(conf, tableName);
table = connection.getTable(tableName);
}
if (m instanceof Increment) {
table.increment((Increment) m);

View File

@ -33,7 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
@ -48,7 +48,7 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
protected boolean isMultiPut = false;
public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName) {
TableName tableName) throws IOException {
super(dataGen, conf, tableName, "W");
}
@ -79,15 +79,15 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
}
public class HBaseWriterThread extends Thread {
private final HTable table;
private final HTableInterface table;
public HBaseWriterThread(int writerId) throws IOException {
setName(getClass().getSimpleName() + "_" + writerId);
table = createTable();
}
protected HTable createTable() throws IOException {
return new HTable(conf, tableName);
protected HTableInterface createTable() throws IOException {
return connection.getTable(tableName);
}
@Override
@ -138,7 +138,7 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
}
}
public void insert(HTable table, Put put, long keyBase) {
public void insert(HTableInterface table, Put put, long keyBase) {
long start = System.currentTimeMillis();
try {
put = (Put) dataGenerator.beforeMutate(keyBase, put);

View File

@ -33,7 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/** Creates multiple threads that write key/values into the */
@ -73,7 +73,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
protected boolean trackWroteKeys;
public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, String actionLetter) {
TableName tableName, String actionLetter) throws IOException {
super(dataGen, conf, tableName, actionLetter);
}
@ -89,11 +89,11 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
}
}
protected String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
protected String getRegionDebugInfoSafe(HTableInterface table, byte[] rowKey) {
HRegionLocation cached = null, real = null;
try {
cached = table.getRegionLocation(rowKey, false);
real = table.getRegionLocation(rowKey, true);
cached = connection.getRegionLocation(tableName, rowKey, false);
real = connection.getRegionLocation(tableName, rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}

View File

@ -26,7 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.security.User;
@ -42,7 +42,7 @@ public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
private User userOwner;
public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, User userOwner) {
TableName tableName, User userOwner) throws IOException {
super(dataGen, conf, tableName);
this.userOwner = userOwner;
}
@ -62,7 +62,7 @@ public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
public class HBaseWriterThreadWithACL extends HBaseWriterThread {
private HTable table;
private HTableInterface table;
private WriteAccessAction writerAction = new WriteAccessAction();
public HBaseWriterThreadWithACL(int writerId) throws IOException {
@ -70,7 +70,7 @@ public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
}
@Override
protected HTable createTable() throws IOException {
protected HTableInterface createTable() throws IOException {
return null;
}
@ -86,7 +86,7 @@ public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
}
@Override
public void insert(final HTable table, Put put, final long keyBase) {
public void insert(final HTableInterface table, Put put, final long keyBase) {
final long start = System.currentTimeMillis();
try {
put = (Put) dataGenerator.beforeMutate(keyBase, put);
@ -125,7 +125,7 @@ public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
public Object run() throws Exception {
try {
if (table == null) {
table = new HTable(conf, tableName);
table = connection.getTable(tableName);
}
table.put(put);
} catch (IOException e) {
@ -136,7 +136,7 @@ public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
}
}
private void recordFailure(final HTable table, final Put put, final long keyBase,
private void recordFailure(final HTableInterface table, final Put put, final long keyBase,
final long start, IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;

View File

@ -110,13 +110,13 @@ public class TestMiniClusterLoadSequential {
}
protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
Configuration conf, TableName tableName, double verifyPercent) {
Configuration conf, TableName tableName, double verifyPercent) throws IOException {
MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
return reader;
}
protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
Configuration conf, TableName tableName) {
Configuration conf, TableName tableName) throws IOException {
MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
writer.setMultiPut(isMultiPut);
return writer;