HBASE-9155 Enhance LoadTestTool to support updates

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1512817 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-08-10 18:53:46 +00:00
parent 71afb594dc
commit 1d4dfa8ddf
9 changed files with 788 additions and 237 deletions

View File

@ -25,6 +25,14 @@ import java.util.Set;
public abstract class LoadTestDataGenerator {
protected final LoadTestKVGenerator kvGenerator;
// The mutate info column stores information
// about update done to this column family this row.
public final static byte[] MUTATE_INFO = "mutate_info".getBytes();
// The increment column always has a long value,
// which can be incremented later on during updates.
public final static byte[] INCREMENT = "increment".getBytes();
/**
* Initializes the object.
* @param minValueSize minimum size of the value generated by

View File

@ -113,6 +113,19 @@ public abstract class IngestIntegrationTestBase {
Assert.fail(errorMsg);
}
ret = loadTool.run(new String[] {
"-tn", tableName,
"-update", String.format("60:%d", writeThreads),
"-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys),
"-skip_init"
});
if (0 != ret) {
String errorMsg = "Update failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
ret = loadTool.run(new String[] {
"-tn", tableName,
"-read", "100:20",

View File

@ -24,8 +24,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
@ -70,10 +69,14 @@ public class LoadTestTool extends AbstractHBaseTool {
"<avg_cols_per_key>:<avg_data_size>" +
"[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
/** Usa\ge string for the read option */
/** Usage string for the read option */
protected static final String OPT_USAGE_READ =
"<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
/** Usage string for the update option */
protected static final String OPT_USAGE_UPDATE =
"<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
Arrays.toString(BloomType.values());
@ -111,6 +114,8 @@ public class LoadTestTool extends AbstractHBaseTool {
protected static final String OPT_SKIP_INIT = "skip_init";
protected static final String OPT_INIT_ONLY = "init_only";
private static final String NUM_TABLES = "num_tables";
protected static final String OPT_BATCHUPDATE = "batchupdate";
protected static final String OPT_UPDATE = "update";
protected static final long DEFAULT_START_KEY = 0;
@ -119,10 +124,11 @@ public class LoadTestTool extends AbstractHBaseTool {
protected MultiThreadedWriter writerThreads = null;
protected MultiThreadedReader readerThreads = null;
protected MultiThreadedUpdater updaterThreads = null;
protected long startKey, endKey;
protected boolean isWrite, isRead;
protected boolean isWrite, isRead, isUpdate;
// Column family options
protected DataBlockEncoding dataBlockEncodingAlgo;
@ -136,6 +142,11 @@ public class LoadTestTool extends AbstractHBaseTool {
protected int minColDataSize, maxColDataSize;
protected boolean isMultiPut;
// Updater options
protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
protected int updatePercent;
protected boolean isBatchUpdate;
// Reader options
private int numReaderThreads = DEFAULT_NUM_THREADS;
private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
@ -212,6 +223,7 @@ public class LoadTestTool extends AbstractHBaseTool {
addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
addOptWithArg(OPT_READ, OPT_USAGE_READ);
addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
@ -225,6 +237,8 @@ public class LoadTestTool extends AbstractHBaseTool {
addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
"separate puts for every column in a row");
addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
"separate updates for every column in a row");
addOptNoArg(OPT_ENCODE_IN_CACHE_ONLY, OPT_ENCODE_IN_CACHE_ONLY_USAGE);
addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
@ -250,16 +264,17 @@ public class LoadTestTool extends AbstractHBaseTool {
isWrite = cmd.hasOption(OPT_WRITE);
isRead = cmd.hasOption(OPT_READ);
isUpdate = cmd.hasOption(OPT_UPDATE);
isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
if (!isWrite && !isRead && !isInitOnly) {
if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
"-" + OPT_READ + " has to be specified");
"-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified");
}
if (isInitOnly && (isRead || isWrite)) {
if (isInitOnly && (isRead || isWrite || isUpdate)) {
throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
+ " either -" + OPT_WRITE + " or -" + OPT_READ);
+ " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
}
if (!isInitOnly) {
@ -303,6 +318,21 @@ public class LoadTestTool extends AbstractHBaseTool {
+ maxColDataSize);
}
if (isUpdate) {
String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 2);
int colIndex = 0;
updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
if (colIndex < mutateOpts.length) {
numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
}
isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
System.out.println("Batch updates: " + isBatchUpdate);
System.out.println("Percent of keys to update: " + updatePercent);
System.out.println("Updater threads: " + numUpdaterThreads);
}
if (isRead) {
String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
int colIndex = 0;
@ -390,16 +420,27 @@ public class LoadTestTool extends AbstractHBaseTool {
writerThreads.setMultiPut(isMultiPut);
}
if (isUpdate) {
updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
updaterThreads.setBatchUpdate(isBatchUpdate);
}
if (isRead) {
readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}
if (isRead && isWrite) {
LOG.info("Concurrent read/write workload: making readers aware of the " +
if (isUpdate && isWrite) {
LOG.info("Concurrent write/update workload: making updaters aware of the " +
"write point");
readerThreads.linkToWriter(writerThreads);
updaterThreads.linkToWriter(writerThreads);
}
if (isRead && (isUpdate || isWrite)) {
LOG.info("Concurrent write/read workload: making readers aware of the " +
"write point");
readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
}
if (isWrite) {
@ -407,6 +448,11 @@ public class LoadTestTool extends AbstractHBaseTool {
writerThreads.start(startKey, endKey, numWriterThreads);
}
if (isUpdate) {
System.out.println("Starting to mutate data...");
updaterThreads.start(startKey, endKey, numUpdaterThreads);
}
if (isRead) {
System.out.println("Starting to read data...");
readerThreads.start(startKey, endKey, numReaderThreads);
@ -416,6 +462,10 @@ public class LoadTestTool extends AbstractHBaseTool {
writerThreads.waitForFinish();
}
if (isUpdate) {
updaterThreads.waitForFinish();
}
if (isRead) {
readerThreads.waitForFinish();
}
@ -424,11 +474,14 @@ public class LoadTestTool extends AbstractHBaseTool {
if (isWrite) {
success = success && writerThreads.getNumWriteFailures() == 0;
}
if (isUpdate) {
success = success && updaterThreads.getNumWriteFailures() == 0;
}
if (isRead) {
success = success && readerThreads.getNumReadErrors() == 0
&& readerThreads.getNumReadFailures() == 0;
}
return success ? EXIT_SUCCESS : this.EXIT_FAILURE;
return success ? EXIT_SUCCESS : EXIT_FAILURE;
}
public static void main(String[] args) {

View File

@ -16,8 +16,13 @@
*/
package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@ -27,12 +32,18 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
/**
* Common base class for reader and writer parts of multi-thread HBase load
* test ({@link LoadTestTool}).
@ -300,7 +311,7 @@ public abstract class MultiThreadedAction {
// See if we have any data at all.
if (result.isEmpty()) {
LOG.error("No data returned for key = [" + rowKeyStr + "]");
LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
return false;
}
@ -311,7 +322,8 @@ public abstract class MultiThreadedAction {
// See if we have all the CFs.
byte[][] expectedCfs = dataGenerator.getColumnFamilies();
if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
LOG.error("Error checking data for key [" + rowKeyStr
+ "], bad family count: " + result.getMap().size());
return false;
}
@ -320,9 +332,62 @@ public abstract class MultiThreadedAction {
String cfStr = Bytes.toString(cf);
Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
if (columnValues == null) {
LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
LOG.error("Error checking data for key [" + rowKeyStr
+ "], no data for family [" + cfStr + "]]");
return false;
}
Map<String, MutationType> mutateInfo = null;
if (verifyCfAndColumnIntegrity || verifyValues) {
if (!columnValues.containsKey(MUTATE_INFO)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
return false;
}
long cfHash = Arrays.hashCode(cf);
// Verify deleted columns, and make up column counts if deleted
byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
mutateInfo = parseMutateInfo(mutateInfoValue);
for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
if (mutate.getValue() == MutationType.DELETE) {
byte[] column = Bytes.toBytes(mutate.getKey());
long columnHash = Arrays.hashCode(column);
long hashCode = cfHash + columnHash;
if (hashCode % 2 == 0) {
if (columnValues.containsKey(column)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
return false;
}
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
columnValues.put(column, hashCodeBytes);
}
}
}
// Verify increment
if (!columnValues.containsKey(INCREMENT)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
return false;
}
long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
if (verifyValues) {
long amount = mutateInfo.isEmpty() ? 0 : cfHash;
long originalValue = Arrays.hashCode(result.getRow());
long extra = currentValue - originalValue;
if (extra != 0 && (amount == 0 || extra % amount != 0)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
return false;
}
if (amount != 0 && extra != amount) {
LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
}
}
// See if we have correct columns.
if (verifyCfAndColumnIntegrity
&& !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
@ -333,21 +398,89 @@ public abstract class MultiThreadedAction {
}
colsStr += "[" + Bytes.toString(col) + "]";
}
LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
LOG.error("Error checking data for key [" + rowKeyStr
+ "], bad columns for family [" + cfStr + "]: " + colsStr);
return false;
}
// See if values check out.
if (verifyValues) {
for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
+ kv.getValue().length);
String column = Bytes.toString(kv.getKey());
MutationType mutation = mutateInfo.get(column);
boolean verificationNeeded = true;
byte[] bytes = kv.getValue();
if (mutation != null) {
boolean mutationVerified = true;
long columnHash = Arrays.hashCode(kv.getKey());
long hashCode = cfHash + columnHash;
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
if (mutation == MutationType.APPEND) {
int offset = bytes.length - hashCodeBytes.length;
mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
if (mutationVerified) {
int n = 1;
while (true) {
int newOffset = offset - hashCodeBytes.length;
if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
break;
}
offset = newOffset;
n++;
}
if (n > 1) {
LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + column + "], appended [" + n + "] times");
}
byte[] dest = new byte[offset];
System.arraycopy(bytes, 0, dest, 0, offset);
bytes = dest;
}
} else if (hashCode % 2 == 0) { // checkAndPut
mutationVerified = Bytes.equals(bytes, hashCodeBytes);
verificationNeeded = false;
}
if (!mutationVerified) {
LOG.error("Error checking data for key [" + rowKeyStr
+ "], mutation checking failed for column family [" + cfStr + "], column ["
+ column + "]; mutation [" + mutation + "], hashCode ["
+ hashCode + "], verificationNeeded ["
+ verificationNeeded + "]");
return false;
}
} // end of mutation checking
if (verificationNeeded &&
!dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + column + "], mutation [" + mutation
+ "]; value of length " + bytes.length);
return false;
}
}
}
}
}
return true;
}
// Parse mutate info into a map of <column name> => <update action>
private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
Map<String, MutationType> mi = new HashMap<String, MutationType>();
if (mutateInfo != null) {
String mutateInfoStr = Bytes.toString(mutateInfo);
String[] mutations = mutateInfoStr.split("#");
for (String mutation: mutations) {
if (mutation.isEmpty()) continue;
Preconditions.checkArgument(mutation.contains(":"),
"Invalid mutation info " + mutation);
int p = mutation.indexOf(":");
String column = mutation.substring(0, p);
MutationType type = MutationType.valueOf(
Integer.parseInt(mutation.substring(p+1)));
mi.put(column, type);
}
}
return mi;
}
}

View File

@ -18,15 +18,15 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@ -41,7 +41,7 @@ public class MultiThreadedReader extends MultiThreadedAction
private final double verifyPercent;
private volatile boolean aborted;
private MultiThreadedWriter writer = null;
private MultiThreadedWriterBase writer = null;
/**
* The number of keys verified in a sequence. This will never be larger than
@ -77,9 +77,9 @@ public class MultiThreadedReader extends MultiThreadedAction
this.verifyPercent = verifyPercent;
}
public void linkToWriter(MultiThreadedWriter writer) {
public void linkToWriter(MultiThreadedWriterBase writer) {
this.writer = writer;
writer.setTrackInsertedKeys(true);
writer.setTrackWroteKeys(true);
}
public void setMaxErrors(int maxErrors) {
@ -108,7 +108,6 @@ public class MultiThreadedReader extends MultiThreadedAction
public class HBaseReaderThread extends Thread {
private final int readerId;
private final HTable table;
private final Random random = new Random();
/** The "current" key being read. Increases from startKey to endKey. */
private long curKey;
@ -182,13 +181,13 @@ public class MultiThreadedReader extends MultiThreadedAction
* constraint.
*/
private long maxKeyWeCanRead() {
long insertedUpToKey = writer.insertedUpToKey();
long insertedUpToKey = writer.wroteUpToKey();
if (insertedUpToKey >= endKey - 1) {
// The writer has finished writing our range, so we can read any
// key in the range.
return endKey - 1;
}
return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow);
return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
}
private long getNextKeyToRead() {
@ -217,7 +216,7 @@ public class MultiThreadedReader extends MultiThreadedAction
// later. Set a flag to make sure that we don't count this key towards
// the set of unique keys we have verified.
readingRandomKey = true;
return startKey + Math.abs(random.nextLong())
return startKey + Math.abs(RandomUtils.nextLong())
% (maxKeyToRead - startKey + 1);
}
@ -239,7 +238,7 @@ public class MultiThreadedReader extends MultiThreadedAction
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
queryKey(get, random.nextInt(100) < verifyPercent);
queryKey(get, RandomUtils.nextInt(100) < verifyPercent);
} catch (IOException e) {
numReadFailures.addAndGet(1);
LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
@ -279,7 +278,7 @@ public class MultiThreadedReader extends MultiThreadedAction
numCols.addAndGet(cols);
} else {
if (writer != null) {
LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
}
numErrorsAfterThis = numReadErrors.incrementAndGet();
}
@ -315,5 +314,4 @@ public class MultiThreadedReader extends MultiThreadedAction
appendToStatus(sb, "READ ERRORS", numReadErrors.get());
return sb.toString();
}
}

View File

@ -0,0 +1,290 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedUpdater extends MultiThreadedWriterBase {
private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class);
private Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
private MultiThreadedWriterBase writer = null;
private boolean isBatchUpdate = false;
private final double updatePercent;
public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double updatePercent) {
super(dataGen, conf, tableName, "U");
this.updatePercent = updatePercent;
}
/** Use batch vs. separate updates for every column in a row */
public void setBatchUpdate(boolean isBatchUpdate) {
this.isBatchUpdate = isBatchUpdate;
}
public void linkToWriter(MultiThreadedWriterBase writer) {
this.writer = writer;
writer.setTrackWroteKeys(true);
}
@Override
public void start(long startKey, long endKey, int numThreads)
throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
}
for (int i = 0; i < numThreads; ++i) {
HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
updaters.add(updater);
}
startThreads(updaters);
}
private long getNextKeyToUpdate() {
if (writer == null) {
return nextKeyToWrite.getAndIncrement();
}
synchronized (this) {
if (nextKeyToWrite.get() >= endKey) {
// Finished the whole key range
return endKey;
}
while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
Threads.sleepWithoutInterrupt(100);
}
long k = nextKeyToWrite.getAndIncrement();
if (writer.failedToWriteKey(k)) {
failedKeySet.add(k);
return getNextKeyToUpdate();
}
return k;
}
}
private class HBaseUpdaterThread extends Thread {
private final HTable table;
public HBaseUpdaterThread(int updaterId) throws IOException {
setName(getClass().getSimpleName() + "_" + updaterId);
table = new HTable(conf, tableName);
}
public void run() {
try {
long rowKeyBase;
StringBuilder buf = new StringBuilder();
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
if (RandomUtils.nextInt(100) < updatePercent) {
byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
Increment inc = new Increment(rowKey);
Append app = new Append(rowKey);
numKeys.addAndGet(1);
int columnCount = 0;
for (byte[] cf : columnFamilies) {
long cfHash = Arrays.hashCode(cf);
inc.addColumn(cf, INCREMENT, cfHash);
buf.setLength(0); // Clear the buffer
buf.append("#").append(Bytes.toString(INCREMENT));
buf.append(":").append(MutationType.INCREMENT.getNumber());
app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
++columnCount;
if (!isBatchUpdate) {
mutate(table, inc, rowKeyBase);
numCols.addAndGet(1);
inc = new Increment(rowKey);
mutate(table, app, rowKeyBase);
numCols.addAndGet(1);
app = new Append(rowKey);
}
Result result = null;
try {
Get get = new Get(rowKey);
get.addFamily(cf);
result = table.get(get);
} catch (IOException ie) {
LOG.warn("Failed to get the row for key = ["
+ rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie);
}
Map<byte[], byte[]> columnValues =
result != null ? result.getFamilyMap(cf) : null;
if (columnValues == null) {
failedKeySet.add(rowKeyBase);
LOG.error("Failed to update the row with key = ["
+ rowKey + "], since we could not get the original row");
}
for (byte[] column : columnValues.keySet()) {
if (Bytes.equals(column, INCREMENT)
|| Bytes.equals(column, MUTATE_INFO)) {
continue;
}
MutationType mt = MutationType.valueOf(
RandomUtils.nextInt(MutationType.values().length));
long columnHash = Arrays.hashCode(column);
long hashCode = cfHash + columnHash;
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
if (hashCode % 2 == 0) {
KeyValue kv = result.getColumnLatest(cf, column);
checkedValue = kv != null ? kv.getValue() : null;
Preconditions.checkNotNull(checkedValue,
"Column value to be checked should not be null");
}
buf.setLength(0); // Clear the buffer
buf.append("#").append(Bytes.toString(column)).append(":");
++columnCount;
switch (mt) {
case PUT:
Put put = new Put(rowKey);
put.add(cf, column, hashCodeBytes);
mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.PUT.getNumber());
break;
case DELETE:
Delete delete = new Delete(rowKey);
// Delete all versions since a put
// could be called multiple times if CM is used
delete.deleteColumns(cf, column);
mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.DELETE.getNumber());
break;
default:
buf.append(MutationType.APPEND.getNumber());
app.add(cf, column, hashCodeBytes);
}
app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
if (!isBatchUpdate) {
mutate(table, app, rowKeyBase);
numCols.addAndGet(1);
app = new Append(rowKey);
}
}
}
if (isBatchUpdate) {
if (verbose) {
LOG.debug("Preparing increment and append for key = ["
+ rowKey + "], " + columnCount + " columns");
}
mutate(table, inc, rowKeyBase);
mutate(table, app, rowKeyBase);
numCols.addAndGet(columnCount);
}
}
if (trackWroteKeys) {
wroteKeys.add(rowKeyBase);
}
}
} finally {
try {
table.close();
} catch (IOException e) {
LOG.error("Error closing table", e);
}
numThreadsWorking.decrementAndGet();
}
}
}
@Override
public void waitForFinish() {
super.waitForFinish();
System.out.println("Failed to update keys: " + failedKeySet.size());
for (Long key : failedKeySet) {
System.out.println("Failed to update key: " + key);
}
}
public void mutate(HTable table, Mutation m, long keyBase) {
mutate(table, m, keyBase, null, null, null, null);
}
public void mutate(HTable table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
if (m instanceof Increment) {
table.increment((Increment)m);
} else if (m instanceof Append) {
table.append((Append)m);
} else if (m instanceof Put) {
table.checkAndPut(row, cf, q, v, (Put)m);
} else if (m instanceof Delete) {
table.checkAndDelete(row, cf, q, v, (Delete)m);
} else {
throw new IllegalArgumentException(
"unsupported mutation " + m.getClass().getSimpleName());
}
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
}
}

View File

@ -18,70 +18,31 @@
package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedWriter extends MultiThreadedAction {
public class MultiThreadedWriter extends MultiThreadedWriterBase {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
private boolean isMultiPut = false;
/**
* A temporary place to keep track of inserted keys. This is written to by
* all writers and is drained on a separate thread that populates
* {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
* being inserted. This queue is supposed to stay small.
*/
private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
/**
* This is the current key to be inserted by any thread. Each thread does an
* atomic get and increment operation and inserts the current value.
*/
private AtomicLong nextKeyToInsert = new AtomicLong();
/**
* The highest key in the contiguous range of keys .
*/
private AtomicLong insertedUpToKey = new AtomicLong();
/** The sorted set of keys NOT inserted by the writers */
private Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
/**
* The total size of the temporary inserted key set that have not yet lined
* up in a our contiguous sequence starting from startKey. Supposed to stay
* small.
*/
private AtomicLong insertedKeyQueueSize = new AtomicLong();
/** Enable this if used in conjunction with a concurrent reader. */
private boolean trackInsertedKeys;
public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName) {
super(dataGen, conf, tableName, "W");
@ -101,19 +62,11 @@ public class MultiThreadedWriter extends MultiThreadedAction {
LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
}
nextKeyToInsert.set(startKey);
insertedUpToKey.set(startKey - 1);
for (int i = 0; i < numThreads; ++i) {
HBaseWriterThread writer = new HBaseWriterThread(i);
writers.add(writer);
}
if (trackInsertedKeys) {
new Thread(new InsertedKeysTracker()).start();
numThreadsWorking.incrementAndGet();
}
startThreads(writers);
}
@ -129,13 +82,12 @@ public class MultiThreadedWriter extends MultiThreadedAction {
try {
long rowKeyBase;
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
Put put = new Put(rowKey);
numKeys.addAndGet(1);
int columnCount = 0;
for (byte[] cf : columnFamilies) {
String s;
byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
for (byte[] column : columns) {
byte[] value = dataGenerator.generateValue(rowKey, cf, column);
@ -147,6 +99,14 @@ public class MultiThreadedWriter extends MultiThreadedAction {
put = new Put(rowKey);
}
}
long rowKeyHash = Arrays.hashCode(rowKey);
put.add(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
put.add(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
if (!isMultiPut) {
insert(table, put, rowKeyBase);
numCols.addAndGet(1);
put = new Put(rowKey);
}
}
if (isMultiPut) {
if (verbose) {
@ -155,8 +115,8 @@ public class MultiThreadedWriter extends MultiThreadedAction {
insert(table, put, rowKeyBase);
numCols.addAndGet(columnCount);
}
if (trackInsertedKeys) {
insertedKeys.add(rowKeyBase);
if (trackWroteKeys) {
wroteKeys.add(rowKeyBase);
}
}
} finally {
@ -170,104 +130,6 @@ public class MultiThreadedWriter extends MultiThreadedAction {
}
}
public void insert(HTable table, Put put, long keyBase) {
long start = System.currentTimeMillis();
try {
table.put(put);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+ exceptionInfo);
}
}
private String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
HRegionLocation cached = null, real = null;
try {
cached = table.getRegionLocation(rowKey, false);
real = table.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}
String result = "no information can be obtained";
if (cached != null) {
result = "cached: " + cached.toString();
}
if (real != null) {
if (real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";
result += "real: " + real.toString();
}
}
return result;
}
/**
* A thread that keeps track of the highest key in the contiguous range of
* inserted keys.
*/
private class InsertedKeysTracker implements Runnable {
@Override
public void run() {
Thread.currentThread().setName(getClass().getSimpleName());
try {
long expectedKey = startKey;
Queue<Long> sortedKeys = new PriorityQueue<Long>();
while (expectedKey < endKey) {
// Block until a new element is available.
Long k;
try {
k = insertedKeys.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Inserted key tracker thread interrupted", e);
break;
}
if (k == null) {
continue;
}
if (k == expectedKey) {
// Skip the "sorted key" queue and consume this key.
insertedUpToKey.set(k);
++expectedKey;
} else {
sortedKeys.add(k);
}
// See if we have a sequence of contiguous keys lined up.
while (!sortedKeys.isEmpty()
&& ((k = sortedKeys.peek()) == expectedKey)) {
sortedKeys.poll();
insertedUpToKey.set(k);
++expectedKey;
}
insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size());
}
} catch (Exception ex) {
LOG.error("Error in inserted key tracker", ex);
} finally {
numThreadsWorking.decrementAndGet();
}
}
}
@Override
public void waitForFinish() {
super.waitForFinish();
@ -276,37 +138,4 @@ public class MultiThreadedWriter extends MultiThreadedAction {
System.out.println("Failed to write key: " + key);
}
}
public int getNumWriteFailures() {
return failedKeySet.size();
}
/**
* The max key until which all keys have been inserted (successfully or not).
* @return the last key that we have inserted all keys up to (inclusive)
*/
public long insertedUpToKey() {
return insertedUpToKey.get();
}
public boolean failedToWriteKey(long k) {
return failedKeySet.contains(k);
}
@Override
protected String progressInfo() {
StringBuilder sb = new StringBuilder();
appendToStatus(sb, "insertedUpTo", insertedUpToKey.get());
appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get());
return sb.toString();
}
/**
* Used for a joint write/read workload. Enables tracking the last inserted
* key, which requires a blocking queue and a consumer thread.
* @param enable whether to enable tracking the last inserted key
*/
public void setTrackInsertedKeys(boolean enable) {
trackInsertedKeys = enable;
}
}

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
/** Creates multiple threads that write key/values into the */
public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
/**
* A temporary place to keep track of inserted/updated keys. This is written to by
* all writers and is drained on a separate thread that populates
* {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
* being inserted/updated. This queue is supposed to stay small.
*/
protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000);
/**
* This is the current key to be inserted/updated by any thread. Each thread does an
* atomic get and increment operation and inserts the current value.
*/
protected AtomicLong nextKeyToWrite = new AtomicLong();
/**
* The highest key in the contiguous range of keys .
*/
protected AtomicLong wroteUpToKey = new AtomicLong();
/** The sorted set of keys NOT inserted/updated by the writers */
protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
/**
* The total size of the temporary inserted/updated key set that have not yet lined
* up in a our contiguous sequence starting from startKey. Supposed to stay
* small.
*/
protected AtomicLong wroteKeyQueueSize = new AtomicLong();
/** Enable this if used in conjunction with a concurrent reader. */
protected boolean trackWroteKeys;
public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, String actionLetter) {
super(dataGen, conf, tableName, actionLetter);
}
@Override
public void start(long startKey, long endKey, int numThreads)
throws IOException {
super.start(startKey, endKey, numThreads);
nextKeyToWrite.set(startKey);
wroteUpToKey.set(startKey - 1);
if (trackWroteKeys) {
new Thread(new WroteKeysTracker()).start();
numThreadsWorking.incrementAndGet();
}
}
protected String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
HRegionLocation cached = null, real = null;
try {
cached = table.getRegionLocation(rowKey, false);
real = table.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}
String result = "no information can be obtained";
if (cached != null) {
result = "cached: " + cached.toString();
}
if (real != null) {
if (real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";
result += "real: " + real.toString();
}
}
return result;
}
/**
* A thread that keeps track of the highest key in the contiguous range of
* inserted/updated keys.
*/
private class WroteKeysTracker implements Runnable {
@Override
public void run() {
Thread.currentThread().setName(getClass().getSimpleName());
try {
long expectedKey = startKey;
Queue<Long> sortedKeys = new PriorityQueue<Long>();
while (expectedKey < endKey) {
// Block until a new element is available.
Long k;
try {
k = wroteKeys.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Inserted key tracker thread interrupted", e);
break;
}
if (k == null) {
continue;
}
if (k == expectedKey) {
// Skip the "sorted key" queue and consume this key.
wroteUpToKey.set(k);
++expectedKey;
} else {
sortedKeys.add(k);
}
// See if we have a sequence of contiguous keys lined up.
while (!sortedKeys.isEmpty()
&& ((k = sortedKeys.peek()) == expectedKey)) {
sortedKeys.poll();
wroteUpToKey.set(k);
++expectedKey;
}
wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
}
} catch (Exception ex) {
LOG.error("Error in inserted/updaed key tracker", ex);
} finally {
numThreadsWorking.decrementAndGet();
}
}
}
public int getNumWriteFailures() {
return failedKeySet.size();
}
public void insert(HTable table, Put put, long keyBase) {
long start = System.currentTimeMillis();
try {
table.put(put);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+ exceptionInfo);
}
}
/**
* The max key until which all keys have been inserted/updated (successfully or not).
* @return the last key that we have inserted/updated all keys up to (inclusive)
*/
public long wroteUpToKey() {
return wroteUpToKey.get();
}
public boolean failedToWriteKey(long k) {
return failedKeySet.contains(k);
}
@Override
protected String progressInfo() {
StringBuilder sb = new StringBuilder();
appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
return sb.toString();
}
/**
* Used for a joint write/read workload. Enables tracking the last inserted/updated
* key, which requires a blocking queue and a consumer thread.
* @param enable whether to enable tracking the last inserted/updated key
*/
public void setTrackWroteKeys(boolean enable) {
trackWroteKeys = enable;
}
}