HBASE-7383 create integration test for HBASE-5416 (improving scan performance for certain filters) (Sergey)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1433224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-01-14 23:57:48 +00:00
parent 9575f0f0a0
commit 96c32807dd
12 changed files with 659 additions and 176 deletions

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util.test;
import java.util.Set;
/**
* A generator of random data (keys/cfs/columns/values) for load testing.
* Contains LoadTestKVGenerator as a matter of convenience...
*/
public abstract class LoadTestDataGenerator {
protected final LoadTestKVGenerator kvGenerator;
/**
* Initializes the object.
* @param minValueSize minimum size of the value generated by
* {@link #generateValue(byte[], byte[], byte[])}.
* @param maxValueSize maximum size of the value generated by
* {@link #generateValue(byte[], byte[], byte[])}.
*/
public LoadTestDataGenerator(int minValueSize, int maxValueSize) {
this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize);
}
/**
* Generates a deterministic, unique hashed row key from a number. That way, the user can
* keep track of numbers, without messing with byte array and ensuring key distribution.
* @param keyBase Base number for a key, such as a loop counter.
*/
public abstract byte[] getDeterministicUniqueKey(long keyBase);
/**
* Gets column families for the load test table.
* @return The array of byte[]s representing column family names.
*/
public abstract byte[][] getColumnFamilies();
/**
* Generates an applicable set of columns to be used for a particular key and family.
* @param rowKey The row key to generate for.
* @param cf The column family name to generate for.
* @return The array of byte[]s representing column names.
*/
public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf);
/**
* Generates a value to be used for a particular row/cf/column.
* @param rowKey The row key to generate for.
* @param cf The column family name to generate for.
* @param column The column name to generate for.
* @return The value to use.
*/
public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column);
/**
* Checks that columns for a rowKey and cf are valid if generated via
* {@link #generateColumnsForCf(byte[], byte[])}
* @param rowKey The row key to verify for.
* @param cf The column family name to verify for.
* @param columnSet The column set (for example, encountered by read).
* @return True iff valid.
*/
public abstract boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet);
/**
* Checks that value for a rowKey/cf/column is valid if generated via
* {@link #generateValue(byte[], byte[], byte[])}
* @param rowKey The row key to verify for.
* @param cf The column family name to verify for.
* @param column The column name to verify for.
* @param value The value (for example, encountered by read).
* @return True iff valid.
*/
public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.hadoop.hbase.util.test;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.hbase.util.Bytes;
@ -27,8 +28,6 @@ import org.apache.hadoop.hbase.util.MD5Hash;
* hash. Values are generated by selecting value size in the configured range
* and generating a pseudo-random sequence of bytes seeded by key, column
* qualifier, and value size.
* <p>
* Not thread-safe, so a separate instance is needed for every writer thread/
*/
public class LoadTestKVGenerator {
@ -49,13 +48,13 @@ public class LoadTestKVGenerator {
/**
* Verifies that the given byte array is the same as what would be generated
* for the given row key and qualifier. We are assuming that the value size
* is correct, and only verify the actual bytes. However, if the min/max
* value sizes are set sufficiently high, an accidental match should be
* for the given seed strings (row/cf/column/...). We are assuming that the
* value size is correct, and only verify the actual bytes. However, if the
* min/max value sizes are set sufficiently high, an accidental match should be
* extremely improbable.
*/
public static boolean verify(String rowKey, String qual, byte[] value) {
byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
public static boolean verify(byte[] value, byte[]... seedStrings) {
byte[] expectedData = getValueForRowColumn(value.length, seedStrings);
return Bytes.equals(expectedData, value);
}
@ -74,27 +73,31 @@ public class LoadTestKVGenerator {
/**
* Generates a value for the given key index and column qualifier. Size is
* selected randomly in the configured range. The generated value depends
* only on the combination of the key, qualifier, and the selected value
* size. This allows to verify the actual value bytes when reading, as done
* in {@link #verify(String, String, byte[])}.
* only on the combination of the strings passed (key/cf/column/...) and the selected
* value size. This allows to verify the actual value bytes when reading, as done
* in {#verify(byte[], byte[]...)}
* This method is as thread-safe as Random class. It appears that the worst bug ever
* found with the latter is that multiple threads will get some duplicate values, which
* we don't care about.
*/
public byte[] generateRandomSizeValue(long key, String qual) {
String rowKey = md5PrefixedKey(key);
public byte[] generateRandomSizeValue(byte[]... seedStrings) {
int dataSize = minValueSize;
if(minValueSize != maxValueSize){
if(minValueSize != maxValueSize) {
dataSize = minValueSize + randomForValueSize.nextInt(Math.abs(maxValueSize - minValueSize));
}
return getValueForRowColumn(rowKey, qual, dataSize);
return getValueForRowColumn(dataSize, seedStrings);
}
/**
* Generates random bytes of the given size for the given row and column
* qualifier. The random seed is fully determined by these parameters.
*/
private static byte[] getValueForRowColumn(String rowKey, String qual,
int dataSize) {
Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
dataSize);
private static byte[] getValueForRowColumn(int dataSize, byte[]... seedStrings) {
long seed = dataSize;
for (byte[] str : seedStrings) {
seed += Bytes.toString(str).hashCode();
}
Random seededRandom = new Random(seed);
byte[] randomBytes = new byte[dataSize];
seededRandom.nextBytes(randomBytes);
return randomBytes;

View File

@ -41,8 +41,8 @@ public class TestLoadTestKVGenerator {
@Test
public void testValueLength() {
for (int i = 0; i < 1000; ++i) {
byte[] v = gen.generateRandomSizeValue(i,
String.valueOf(rand.nextInt()));
byte[] v = gen.generateRandomSizeValue(Integer.toString(i).getBytes(),
String.valueOf(rand.nextInt()).getBytes());
assertTrue(MIN_LEN <= v.length);
assertTrue(v.length <= MAX_LEN);
}
@ -52,12 +52,12 @@ public class TestLoadTestKVGenerator {
public void testVerification() {
for (int i = 0; i < 1000; ++i) {
for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
String qual = String.valueOf(qualIndex);
byte[] v = gen.generateRandomSizeValue(i, qual);
String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
byte[] qual = String.valueOf(qualIndex).getBytes();
byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
byte[] v = gen.generateRandomSizeValue(rowKey, qual);
assertTrue(LoadTestKVGenerator.verify(v, rowKey, qual));
v[0]++;
assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
assertFalse(LoadTestKVGenerator.verify(v, rowKey, qual));
}
}
}

View File

@ -0,0 +1,286 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.MultiThreadedWriter;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Integration test that verifies lazy CF loading during scans by doing repeated scans
* with this feature while multiple threads are continuously writing values; and
* verifying the result.
*/
@Category(IntegrationTests.class)
public class IntegrationTestLazyCfLoading {
private static final String TABLE_NAME = IntegrationTestLazyCfLoading.class.getSimpleName();
private static final String TIMEOUT_KEY = "hbase.%s.timeout";
/** A soft test timeout; duration of the test, as such, depends on number of keys to put. */
private static final int DEFAULT_TIMEOUT_MINUTES = 10;
private static final int NUM_SERVERS = 1;
/** Set regions per server low to ensure splits happen during test */
private static final int REGIONS_PER_SERVER = 3;
private static final int KEYS_TO_WRITE_PER_SERVER = 20000;
private static final int WRITER_THREADS = 10;
private static final int WAIT_BETWEEN_SCANS_MS = 1000;
private static final Log LOG = LogFactory.getLog(IntegrationTestLazyCfLoading.class);
private IntegrationTestingUtility util = new IntegrationTestingUtility();
private final DataGenerator dataGen = new DataGenerator();
/** Custom LoadTestDataGenerator. Uses key generation and verification from
* LoadTestKVGenerator. Creates 3 column families; one with an integer column to
* filter on, the 2nd one with an integer column that matches the first integer column (for
* test-specific verification), and byte[] value that is used for general verification; and
* the third one with just the value.
*/
private static class DataGenerator extends LoadTestDataGenerator {
private static final int MIN_DATA_SIZE = 4096;
private static final int MAX_DATA_SIZE = 65536;
public static final byte[] ESSENTIAL_CF = Bytes.toBytes("essential");
public static final byte[] JOINED_CF1 = Bytes.toBytes("joined");
public static final byte[] JOINED_CF2 = Bytes.toBytes("joined2");
public static final byte[] FILTER_COLUMN = Bytes.toBytes("filter");
public static final byte[] VALUE_COLUMN = Bytes.toBytes("val");
public static final long ACCEPTED_VALUE = 1L;
private static final Map<byte[], byte[][]> columnMap = new TreeMap<byte[], byte[][]>(
Bytes.BYTES_COMPARATOR);
private final AtomicLong expectedNumberOfKeys = new AtomicLong(0);
private final AtomicLong totalNumberOfKeys = new AtomicLong(0);
public DataGenerator() {
super(MIN_DATA_SIZE, MAX_DATA_SIZE);
columnMap.put(ESSENTIAL_CF, new byte[][] { FILTER_COLUMN });
columnMap.put(JOINED_CF1, new byte[][] { FILTER_COLUMN, VALUE_COLUMN });
columnMap.put(JOINED_CF2, new byte[][] { VALUE_COLUMN });
}
public long getExpectedNumberOfKeys() {
return expectedNumberOfKeys.get();
}
public long getTotalNumberOfKeys() {
return totalNumberOfKeys.get();
}
@Override
public byte[] getDeterministicUniqueKey(long keyBase) {
return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
}
@Override
public byte[][] getColumnFamilies() {
return columnMap.keySet().toArray(new byte[columnMap.size()][]);
}
@Override
public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
return columnMap.get(cf);
}
@Override
public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) {
// Random deterministic way to make some values "on" and others "off" for filters.
long value = Long.parseLong(Bytes.toString(rowKey, 0, 4), 16) & ACCEPTED_VALUE;
if (Bytes.BYTES_COMPARATOR.compare(cf, ESSENTIAL_CF) == 0) {
totalNumberOfKeys.incrementAndGet();
if (value == ACCEPTED_VALUE) {
expectedNumberOfKeys.incrementAndGet();
}
}
return Bytes.toBytes(value);
} else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) {
return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
}
String error = "Unknown column " + Bytes.toString(column);
assert false : error;
throw new InvalidParameterException(error);
}
@Override
public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) {
// Relies on the filter from getScanFilter being used.
return Bytes.toLong(value) == ACCEPTED_VALUE;
} else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) {
return LoadTestKVGenerator.verify(value, rowKey, cf, column);
}
return false; // some bogus value from read, we don't expect any such thing.
}
@Override
public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
return columnMap.get(cf).length == columnSet.size();
}
public Filter getScanFilter() {
SingleColumnValueFilter scf = new SingleColumnValueFilter(ESSENTIAL_CF, FILTER_COLUMN,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(ACCEPTED_VALUE));
scf.setFilterIfMissing(true);
return scf;
}
};
@Before
public void setUp() throws Exception {
LOG.info("Initializing cluster with " + NUM_SERVERS + " servers");
util.initializeCluster(NUM_SERVERS);
LOG.info("Done initializing cluster");
createTable();
}
private void createTable() throws Exception {
deleteTable();
LOG.info("Creating table");
HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes(TABLE_NAME));
for (byte[] cf : dataGen.getColumnFamilies()) {
htd.addFamily(new HColumnDescriptor(cf));
}
int serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
byte[][] splits = new RegionSplitter.HexStringSplit().split(serverCount * REGIONS_PER_SERVER);
util.getHBaseAdmin().createTable(htd, splits);
LOG.info("Created table");
}
private void deleteTable() throws Exception {
if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
LOG.info("Deleting table");
if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) {
util.getHBaseAdmin().disableTable(TABLE_NAME);
}
util.getHBaseAdmin().deleteTable(TABLE_NAME);
LOG.info("Deleted table");
}
}
@After
public void tearDown() throws Exception {
deleteTable();
LOG.info("Restoring the cluster");
util.restoreCluster();
LOG.info("Done restoring the cluster");
}
@Test
public void testReadersAndWriters() throws Exception {
Configuration conf = util.getConfiguration();
String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
HTable table = new HTable(conf, Bytes.toBytes(TABLE_NAME));
// Create multi-threaded writer and start it. We write multiple columns/CFs and verify
// their integrity, therefore multi-put is necessary.
MultiThreadedWriter writer =
new MultiThreadedWriter(dataGen, conf, Bytes.toBytes(TABLE_NAME));
writer.setMultiPut(true);
LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
writer.start(1, keysToWrite, WRITER_THREADS);
// Now, do scans.
long now = EnvironmentEdgeManager.currentTimeMillis();
long timeLimit = now + (maxRuntime * 60000);
boolean isWriterDone = false;
while (now < timeLimit && !isWriterDone) {
LOG.info("Starting the scan; wrote approximately "
+ dataGen.getTotalNumberOfKeys() + " keys");
isWriterDone = writer.isDone();
if (isWriterDone) {
LOG.info("Scanning full result, writer is done");
}
Scan scan = new Scan();
for (byte[] cf : dataGen.getColumnFamilies()) {
scan.addFamily(cf);
}
scan.setFilter(dataGen.getScanFilter());
scan.setLoadColumnFamiliesOnDemand(true);
// The number of keys we can expect from scan - lower bound (before scan).
// Not a strict lower bound - writer knows nothing about filters, so we report
// this from generator. Writer might have generated the value but not put it yet.
long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
long startTs = EnvironmentEdgeManager.currentTimeMillis();
ResultScanner results = table.getScanner(scan);
long resultCount = 0;
Result result = null;
// Verify and count the results.
while ((result = results.next()) != null) {
boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
++resultCount;
}
long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs;
// Verify the result count.
long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
+ " were generated ", onesGennedAfterScan >= resultCount);
if (isWriterDone) {
Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
+ onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
} else if (onesGennedBeforeScan * 0.9 > resultCount) {
LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
+ ") - there might be a problem, or the writer might just be slow");
}
LOG.info("Scan took " + timeTaken + "ms");
if (!isWriterDone) {
Thread.sleep(WAIT_BETWEEN_SCANS_MS);
now = EnvironmentEdgeManager.currentTimeMillis();
}
}
Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
Assert.assertTrue("Writer is not done", isWriterDone);
// Assert.fail("Boom!");
}
}

View File

@ -3641,7 +3641,7 @@ public class HRegion implements HeapSize { // , Writable{
// First, check if we are at a stop row. If so, there are no more results.
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
filter.filterRow(results);
}
return false;
}
@ -3670,7 +3670,7 @@ public class HRegion implements HeapSize { // , Writable{
final boolean isEmptyRow = results.isEmpty();
// We have the part of the row necessary for filtering (all of it, usually).
// First filter with the filterRow(List).
// First filter with the filterRow(List).
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}

View File

@ -129,13 +129,12 @@ public class TestEncodedSeekers {
private void doPuts(HRegion region) throws IOException{
LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(MIN_VALUE_SIZE, MAX_VALUE_SIZE);
for (int i = 0; i < NUM_ROWS; ++i) {
byte[] key = MultiThreadedWriter.longToByteArrayKey(i);
byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
Put put = new Put(key);
String colAsStr = String.valueOf(j);
byte[] col = Bytes.toBytes(colAsStr);
byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr);
put.add(CF_BYTES, Bytes.toBytes(colAsStr), value);
byte[] col = Bytes.toBytes(String.valueOf(j));
byte[] value = dataGenerator.generateRandomSizeValue(key, col);
put.add(CF_BYTES, col, value);
if(VERBOSE){
KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);
System.err.println(Strings.padFront(i+"", ' ', 4)+" "+kvPut);
@ -151,7 +150,7 @@ public class TestEncodedSeekers {
private void doGets(HRegion region) throws IOException{
for (int i = 0; i < NUM_ROWS; ++i) {
final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i);
final byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
final String qualStr = String.valueOf(j);
if (VERBOSE) {
@ -163,8 +162,8 @@ public class TestEncodedSeekers {
get.addColumn(CF_BYTES, qualBytes);
Result result = region.get(get, null);
assertEquals(1, result.size());
assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr,
result.getValue(CF_BYTES, qualBytes)));
byte[] value = result.getValue(CF_BYTES, qualBytes);
assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes));
}
}
}

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/**
* A command-line utility that reads, writes, and verifies data. Unlike
@ -119,7 +121,7 @@ public class LoadTestTool extends AbstractHBaseTool {
// Writer options
private int numWriterThreads = DEFAULT_NUM_THREADS;
private long minColsPerKey, maxColsPerKey;
private int minColsPerKey, maxColsPerKey;
private int minColDataSize, maxColDataSize;
private boolean isMultiPut;
@ -260,7 +262,7 @@ public class LoadTestTool extends AbstractHBaseTool {
int colIndex = 0;
minColsPerKey = 1;
maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
int avgColDataSize =
parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
minColDataSize = avgColDataSize / 2;
@ -342,16 +344,16 @@ public class LoadTestTool extends AbstractHBaseTool {
initTestTable();
}
LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
if (isWrite) {
writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
writerThreads.setMultiPut(isMultiPut);
writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
writerThreads.setDataSize(minColDataSize, maxColDataSize);
}
if (isRead) {
readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
verifyPercent);
readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}

View File

@ -18,12 +18,19 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.apache.hadoop.util.StringUtils;
/**
@ -34,7 +41,6 @@ public abstract class MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
protected final byte[] tableName;
protected final byte[] columnFamily;
protected final Configuration conf;
protected int numThreads = 1;
@ -51,8 +57,69 @@ public abstract class MultiThreadedAction {
protected AtomicLong totalOpTimeMs = new AtomicLong();
protected boolean verbose = false;
protected int minDataSize = 256;
protected int maxDataSize = 1024;
protected LoadTestDataGenerator dataGenerator = null;
/**
* Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
* set of column families, and random number of columns in range. The table for it can
* be created manually or, for example, via
* {@link HBaseTestingUtility#createPreSplitLoadTestTable(
* org.apache.hadoop.hbase.Configuration, byte[], byte[], Algorithm, DataBlockEncoding)}
*/
public static class DefaultDataGenerator extends LoadTestDataGenerator {
private byte[][] columnFamilies = null;
private int minColumnsPerKey;
private int maxColumnsPerKey;
private final Random random = new Random();
public DefaultDataGenerator(int minValueSize, int maxValueSize,
int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
super(minValueSize, maxValueSize);
this.columnFamilies = columnFamilies;
this.minColumnsPerKey = minColumnsPerKey;
this.maxColumnsPerKey = maxColumnsPerKey;
}
public DefaultDataGenerator(byte[]... columnFamilies) {
// Default values for tests that didn't care to provide theirs.
this(256, 1024, 1, 10, columnFamilies);
}
@Override
public byte[] getDeterministicUniqueKey(long keyBase) {
return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
}
@Override
public byte[][] getColumnFamilies() {
return columnFamilies;
}
@Override
public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
byte[][] columns = new byte[numColumns][];
for (int i = 0; i < numColumns; ++i) {
columns[i] = Integer.toString(i).getBytes();
}
return columns;
}
@Override
public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
}
@Override
public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
return LoadTestKVGenerator.verify(value, rowKey, cf, column);
}
@Override
public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
}
}
/** "R" or "W" */
private String actionLetter;
@ -62,11 +129,11 @@ public abstract class MultiThreadedAction {
public static final int REPORTING_INTERVAL_MS = 5000;
public MultiThreadedAction(Configuration conf, byte[] tableName,
byte[] columnFamily, String actionLetter) {
public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, byte[] tableName,
String actionLetter) {
this.conf = conf;
this.dataGenerator = dataGen;
this.tableName = tableName;
this.columnFamily = columnFamily;
this.actionLetter = actionLetter;
}
@ -165,17 +232,16 @@ public abstract class MultiThreadedAction {
}
}
public void setDataSize(int minDataSize, int maxDataSize) {
this.minDataSize = minDataSize;
this.maxDataSize = maxDataSize;
}
public void waitForFinish() {
while (numThreadsWorking.get() != 0) {
Threads.sleepWithoutInterrupt(1000);
}
}
public boolean isDone() {
return (numThreadsWorking.get() == 0);
}
protected void startThreads(Collection<? extends Thread> threads) {
numThreadsWorking.addAndGet(threads.size());
for (Thread thread : threads) {
@ -202,4 +268,77 @@ public abstract class MultiThreadedAction {
sb.append(v);
}
/**
* See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
* Does not verify cf/column integrity.
*/
public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
return verifyResultAgainstDataGenerator(result, verifyValues, false);
}
/**
* Verifies the result from get or scan using the dataGenerator (that was presumably
* also used to generate said result).
* @param verifyValues verify that values in the result make sense for row/cf/column combination
* @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
* that to use this multiPut should be used, or verification
* has to happen after writes, otherwise there can be races.
* @return
*/
public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
boolean verifyCfAndColumnIntegrity) {
String rowKeyStr = Bytes.toString(result.getRow());
// See if we have any data at all.
if (result.isEmpty()) {
LOG.error("No data returned for key = [" + rowKeyStr + "]");
return false;
}
if (!verifyValues && !verifyCfAndColumnIntegrity) {
return true; // as long as we have something, we are good.
}
// See if we have all the CFs.
byte[][] expectedCfs = dataGenerator.getColumnFamilies();
if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size());
return false;
}
// Verify each column family from get in the result.
for (byte[] cf : result.getMap().keySet()) {
String cfStr = Bytes.toString(cf);
Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
if (columnValues == null) {
LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]");
return false;
}
// See if we have correct columns.
if (verifyCfAndColumnIntegrity
&& !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
String colsStr = "";
for (byte[] col : columnValues.keySet()) {
if (colsStr.length() > 0) {
colsStr += ", ";
}
colsStr += "[" + Bytes.toString(col) + "]";
}
LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr);
return false;
}
// See if values check out.
if (verifyValues) {
for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) {
LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
+ cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " +
+ kv.getValue().length);
return false;
}
}
}
}
return true;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
/** Creates multiple threads that read and verify previously written data */
@ -72,9 +74,9 @@ public class MultiThreadedReader extends MultiThreadedAction
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
public MultiThreadedReader(Configuration conf, byte[] tableName,
byte[] columnFamily, double verifyPercent) {
super(conf, tableName, columnFamily, "R");
public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
byte[] tableName, double verifyPercent) {
super(dataGen, conf, tableName, "R");
this.verifyPercent = verifyPercent;
}
@ -223,14 +225,22 @@ public class MultiThreadedReader extends MultiThreadedAction
}
private Get readKey(long keyToRead) {
Get get = new Get(
LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
get.addFamily(columnFamily);
Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
String cfsString = "";
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
for (byte[] cf : columnFamilies) {
get.addFamily(cf);
if (verbose) {
if (cfsString.length() > 0) {
cfsString += ", ";
}
cfsString += "[" + Bytes.toStringBinary(cf) + "]";
}
}
try {
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
+ ", cf " + Bytes.toStringBinary(columnFamily));
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
queryKey(get, random.nextInt(100) < verifyPercent);
} catch (IOException e) {
@ -250,47 +260,38 @@ public class MultiThreadedReader extends MultiThreadedAction
Result result = table.get(get);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
numKeys.addAndGet(1);
// if we got no data report error
if (result.isEmpty()) {
if (!result.isEmpty()) {
if (verify) {
numKeysVerified.incrementAndGet();
}
} else {
HRegionLocation hloc = table.getRegionLocation(
Bytes.toBytes(rowKey));
LOG.info("Key = " + rowKey + ", RegionServer: "
+ hloc.getHostname());
numReadErrors.addAndGet(1);
LOG.error("No data returned, tried to get actions for key = "
+ rowKey + (writer == null ? "" : ", keys inserted by writer: " +
writer.numKeys.get() + ")"));
if (numReadErrors.get() > maxErrors) {
LOG.error("Aborting readers -- found more than " + maxErrors
+ " errors\n");
aborted = true;
}
}
if (result.getFamilyMap(columnFamily) != null) {
// increment number of columns read
numCols.addAndGet(result.getFamilyMap(columnFamily).size());
if (verify) {
// verify the result
List<KeyValue> keyValues = result.list();
for (KeyValue kv : keyValues) {
String qual = new String(kv.getQualifier());
// if something does not look right report it
if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
numReadErrors.addAndGet(1);
LOG.error("Error checking data for key = " + rowKey
+ ", actionId = " + qual);
}
}
numKeysVerified.addAndGet(1);
boolean isOk = verifyResultAgainstDataGenerator(result, verify);
long numErrorsAfterThis = 0;
if (isOk) {
long cols = 0;
// Count the columns for reporting purposes.
for (byte[] cf : result.getMap().keySet()) {
cols += result.getFamilyMap(cf).size();
}
numCols.addAndGet(cols);
} else {
if (writer != null) {
LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys");
}
numErrorsAfterThis = numReadErrors.incrementAndGet();
}
if (numErrorsAfterThis > maxErrors) {
LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
aborted = true;
}
}
}
public long getNumReadFailures() {

View File

@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -33,14 +33,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedWriter extends MultiThreadedAction {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
private long minColumnsPerKey = 1;
private long maxColumnsPerKey = 10;
private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
private boolean isMultiPut = false;
@ -51,8 +50,7 @@ public class MultiThreadedWriter extends MultiThreadedAction {
* {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
* being inserted. This queue is supposed to stay small.
*/
private BlockingQueue<Long> insertedKeys =
new ArrayBlockingQueue<Long>(10000);
private BlockingQueue<Long> insertedKeys = new ArrayBlockingQueue<Long>(10000);
/**
* This is the current key to be inserted by any thread. Each thread does an
@ -78,9 +76,9 @@ public class MultiThreadedWriter extends MultiThreadedAction {
/** Enable this if used in conjunction with a concurrent reader. */
private boolean trackInsertedKeys;
public MultiThreadedWriter(Configuration conf, byte[] tableName,
byte[] columnFamily) {
super(conf, tableName, columnFamily, "W");
public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
byte[] tableName) {
super(dataGen, conf, tableName, "W");
}
/** Use multi-puts vs. separate puts for every column in a row */
@ -88,11 +86,6 @@ public class MultiThreadedWriter extends MultiThreadedAction {
this.isMultiPut = isMultiPut;
}
public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
this.minColumnsPerKey = minColumnsPerKey;
this.maxColumnsPerKey = maxColumnsPerKey;
}
@Override
public void start(long startKey, long endKey, int numThreads)
throws IOException {
@ -118,17 +111,9 @@ public class MultiThreadedWriter extends MultiThreadedAction {
startThreads(writers);
}
public static byte[] longToByteArrayKey(long rowKey) {
return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
}
private class HBaseWriterThread extends Thread {
private final HTable table;
private final Random random = new Random();
private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
minDataSize, maxDataSize);
public HBaseWriterThread(int writerId) throws IOException {
setName(getClass().getSimpleName() + "_" + writerId);
table = new HTable(conf, tableName);
@ -136,20 +121,36 @@ public class MultiThreadedWriter extends MultiThreadedAction {
public void run() {
try {
long rowKey;
while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
% (maxColumnsPerKey - minColumnsPerKey);
long rowKeyBase;
byte[][] columnFamilies = dataGenerator.getColumnFamilies();
while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) {
byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
Put put = new Put(rowKey);
numKeys.addAndGet(1);
if (isMultiPut) {
multiPutInsertKey(rowKey, 0, numColumns);
} else {
for (long col = 0; col < numColumns; ++col) {
insert(rowKey, col);
int columnCount = 0;
for (byte[] cf : columnFamilies) {
String s;
byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
for (byte[] column : columns) {
byte[] value = dataGenerator.generateValue(rowKey, cf, column);
put.add(cf, column, value);
++columnCount;
if (!isMultiPut) {
insert(put, rowKeyBase);
numCols.addAndGet(1);
put = new Put(rowKey);
}
}
}
if (isMultiPut) {
if (verbose) {
LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
}
insert(put, rowKeyBase);
numCols.addAndGet(columnCount);
}
if (trackInsertedKeys) {
insertedKeys.add(rowKey);
insertedKeys.add(rowKeyBase);
}
}
} finally {
@ -162,52 +163,14 @@ public class MultiThreadedWriter extends MultiThreadedAction {
}
}
public void insert(long rowKey, long col) {
Put put = new Put(longToByteArrayKey(rowKey));
String colAsStr = String.valueOf(col);
put.add(columnFamily, Bytes.toBytes(colAsStr),
dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
public void insert(Put put, long keyBase) {
try {
long start = System.currentTimeMillis();
table.put(put);
numCols.addAndGet(1);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(rowKey);
LOG.error("Failed to insert: " + rowKey);
e.printStackTrace();
}
}
public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
if (verbose) {
LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
+ startCol + ", " + endCol + ")");
}
if (startCol >= endCol) {
return;
}
Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
rowKey).getBytes());
byte[] columnQualifier;
byte[] value;
for (long i = startCol; i < endCol; ++i) {
String qualStr = String.valueOf(i);
columnQualifier = qualStr.getBytes();
value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
put.add(columnFamily, columnQualifier, value);
}
try {
long start = System.currentTimeMillis();
table.put(put);
numCols.addAndGet(endCol - startCol);
totalOpTimeMs.addAndGet(
System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(rowKey);
failedKeySet.add(keyBase);
LOG.error("Failed to insert: " + keyBase);
e.printStackTrace();
}
}
@ -302,8 +265,7 @@ public class MultiThreadedWriter extends MultiThreadedAction {
* key, which requires a blocking queue and a consumer thread.
* @param enable whether to enable tracking the last inserted key
*/
void setTrackInsertedKeys(boolean enable) {
public void setTrackInsertedKeys(boolean enable) {
trackInsertedKeys = enable;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
/**
* A command-line tool that spins up a local process-based cluster, loads
@ -59,8 +60,8 @@ public class RestartMetaTest extends AbstractHBaseTool {
private void loadData() throws IOException {
long startKey = 0;
long endKey = 100000;
long minColsPerKey = 5;
long maxColsPerKey = 15;
int minColsPerKey = 5;
int maxColsPerKey = 15;
int minColDataSize = 256;
int maxColDataSize = 256 * 3;
int numThreads = 10;
@ -74,11 +75,10 @@ public class RestartMetaTest extends AbstractHBaseTool {
System.out.printf("Client Threads: %d\n", numThreads);
// start the writers
MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME,
LoadTestTool.COLUMN_FAMILY);
LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
writer.setMultiPut(true);
writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
writer.setDataSize(minColDataSize, maxColDataSize);
writer.start(startKey, endKey, numThreads);
System.out.printf("Started loading data...");
writer.waitForFinish();

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -139,9 +140,10 @@ public class TestMiniClusterLoadSequential {
TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE);
writerThreads.setMultiPut(isMultiPut);
readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100);
}
protected int numKeys() {