diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java new file mode 100644 index 00000000000..6947b08aea0 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util.test; + +import java.util.Set; + +/** + * A generator of random data (keys/cfs/columns/values) for load testing. + * Contains LoadTestKVGenerator as a matter of convenience... + */ +public abstract class LoadTestDataGenerator { + protected final LoadTestKVGenerator kvGenerator; + + /** + * Initializes the object. + * @param minValueSize minimum size of the value generated by + * {@link #generateValue(byte[], byte[], byte[])}. + * @param maxValueSize maximum size of the value generated by + * {@link #generateValue(byte[], byte[], byte[])}. + */ + public LoadTestDataGenerator(int minValueSize, int maxValueSize) { + this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize); + } + + /** + * Generates a deterministic, unique hashed row key from a number. That way, the user can + * keep track of numbers, without messing with byte array and ensuring key distribution. + * @param keyBase Base number for a key, such as a loop counter. + */ + public abstract byte[] getDeterministicUniqueKey(long keyBase); + + /** + * Gets column families for the load test table. + * @return The array of byte[]s representing column family names. + */ + public abstract byte[][] getColumnFamilies(); + + /** + * Generates an applicable set of columns to be used for a particular key and family. + * @param rowKey The row key to generate for. + * @param cf The column family name to generate for. + * @return The array of byte[]s representing column names. + */ + public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf); + + /** + * Generates a value to be used for a particular row/cf/column. + * @param rowKey The row key to generate for. + * @param cf The column family name to generate for. + * @param column The column name to generate for. + * @return The value to use. + */ + public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column); + + /** + * Checks that columns for a rowKey and cf are valid if generated via + * {@link #generateColumnsForCf(byte[], byte[])} + * @param rowKey The row key to verify for. + * @param cf The column family name to verify for. + * @param columnSet The column set (for example, encountered by read). + * @return True iff valid. + */ + public abstract boolean verify(byte[] rowKey, byte[] cf, Set columnSet); + + /** + * Checks that value for a rowKey/cf/column is valid if generated via + * {@link #generateValue(byte[], byte[], byte[])} + * @param rowKey The row key to verify for. + * @param cf The column family name to verify for. + * @param column The column name to verify for. + * @param value The value (for example, encountered by read). + * @return True iff valid. + */ + public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value); +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java index 63b1aa47576..5a64a1bb17f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestKVGenerator.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.hbase.util.test; +import java.util.Map; import java.util.Random; import org.apache.hadoop.hbase.util.Bytes; @@ -27,8 +28,6 @@ import org.apache.hadoop.hbase.util.MD5Hash; * hash. Values are generated by selecting value size in the configured range * and generating a pseudo-random sequence of bytes seeded by key, column * qualifier, and value size. - *

- * Not thread-safe, so a separate instance is needed for every writer thread/ */ public class LoadTestKVGenerator { @@ -49,13 +48,13 @@ public class LoadTestKVGenerator { /** * Verifies that the given byte array is the same as what would be generated - * for the given row key and qualifier. We are assuming that the value size - * is correct, and only verify the actual bytes. However, if the min/max - * value sizes are set sufficiently high, an accidental match should be + * for the given seed strings (row/cf/column/...). We are assuming that the + * value size is correct, and only verify the actual bytes. However, if the + * min/max value sizes are set sufficiently high, an accidental match should be * extremely improbable. */ - public static boolean verify(String rowKey, String qual, byte[] value) { - byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length); + public static boolean verify(byte[] value, byte[]... seedStrings) { + byte[] expectedData = getValueForRowColumn(value.length, seedStrings); return Bytes.equals(expectedData, value); } @@ -74,27 +73,31 @@ public class LoadTestKVGenerator { /** * Generates a value for the given key index and column qualifier. Size is * selected randomly in the configured range. The generated value depends - * only on the combination of the key, qualifier, and the selected value - * size. This allows to verify the actual value bytes when reading, as done - * in {@link #verify(String, String, byte[])}. + * only on the combination of the strings passed (key/cf/column/...) and the selected + * value size. This allows to verify the actual value bytes when reading, as done + * in {#verify(byte[], byte[]...)} + * This method is as thread-safe as Random class. It appears that the worst bug ever + * found with the latter is that multiple threads will get some duplicate values, which + * we don't care about. */ - public byte[] generateRandomSizeValue(long key, String qual) { - String rowKey = md5PrefixedKey(key); + public byte[] generateRandomSizeValue(byte[]... seedStrings) { int dataSize = minValueSize; - if(minValueSize != maxValueSize){ + if(minValueSize != maxValueSize) { dataSize = minValueSize + randomForValueSize.nextInt(Math.abs(maxValueSize - minValueSize)); } - return getValueForRowColumn(rowKey, qual, dataSize); + return getValueForRowColumn(dataSize, seedStrings); } /** * Generates random bytes of the given size for the given row and column * qualifier. The random seed is fully determined by these parameters. */ - private static byte[] getValueForRowColumn(String rowKey, String qual, - int dataSize) { - Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() + - dataSize); + private static byte[] getValueForRowColumn(int dataSize, byte[]... seedStrings) { + long seed = dataSize; + for (byte[] str : seedStrings) { + seed += Bytes.toString(str).hashCode(); + } + Random seededRandom = new Random(seed); byte[] randomBytes = new byte[dataSize]; seededRandom.nextBytes(randomBytes); return randomBytes; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java index 98c104953b3..9eb627c938f 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java @@ -41,8 +41,8 @@ public class TestLoadTestKVGenerator { @Test public void testValueLength() { for (int i = 0; i < 1000; ++i) { - byte[] v = gen.generateRandomSizeValue(i, - String.valueOf(rand.nextInt())); + byte[] v = gen.generateRandomSizeValue(Integer.toString(i).getBytes(), + String.valueOf(rand.nextInt()).getBytes()); assertTrue(MIN_LEN <= v.length); assertTrue(v.length <= MAX_LEN); } @@ -52,12 +52,12 @@ public class TestLoadTestKVGenerator { public void testVerification() { for (int i = 0; i < 1000; ++i) { for (int qualIndex = 0; qualIndex < 20; ++qualIndex) { - String qual = String.valueOf(qualIndex); - byte[] v = gen.generateRandomSizeValue(i, qual); - String rowKey = LoadTestKVGenerator.md5PrefixedKey(i); - assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v)); + byte[] qual = String.valueOf(qualIndex).getBytes(); + byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes(); + byte[] v = gen.generateRandomSizeValue(rowKey, qual); + assertTrue(LoadTestKVGenerator.verify(v, rowKey, qual)); v[0]++; - assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v)); + assertFalse(LoadTestKVGenerator.verify(v, rowKey, qual)); } } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java new file mode 100644 index 00000000000..e14c204a4b4 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.security.InvalidParameterException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.MultiThreadedWriter; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Integration test that verifies lazy CF loading during scans by doing repeated scans + * with this feature while multiple threads are continuously writing values; and + * verifying the result. + */ +@Category(IntegrationTests.class) +public class IntegrationTestLazyCfLoading { + private static final String TABLE_NAME = IntegrationTestLazyCfLoading.class.getSimpleName(); + private static final String TIMEOUT_KEY = "hbase.%s.timeout"; + + /** A soft test timeout; duration of the test, as such, depends on number of keys to put. */ + private static final int DEFAULT_TIMEOUT_MINUTES = 10; + + private static final int NUM_SERVERS = 1; + /** Set regions per server low to ensure splits happen during test */ + private static final int REGIONS_PER_SERVER = 3; + private static final int KEYS_TO_WRITE_PER_SERVER = 20000; + private static final int WRITER_THREADS = 10; + private static final int WAIT_BETWEEN_SCANS_MS = 1000; + + private static final Log LOG = LogFactory.getLog(IntegrationTestLazyCfLoading.class); + private IntegrationTestingUtility util = new IntegrationTestingUtility(); + private final DataGenerator dataGen = new DataGenerator(); + + /** Custom LoadTestDataGenerator. Uses key generation and verification from + * LoadTestKVGenerator. Creates 3 column families; one with an integer column to + * filter on, the 2nd one with an integer column that matches the first integer column (for + * test-specific verification), and byte[] value that is used for general verification; and + * the third one with just the value. + */ + private static class DataGenerator extends LoadTestDataGenerator { + private static final int MIN_DATA_SIZE = 4096; + private static final int MAX_DATA_SIZE = 65536; + public static final byte[] ESSENTIAL_CF = Bytes.toBytes("essential"); + public static final byte[] JOINED_CF1 = Bytes.toBytes("joined"); + public static final byte[] JOINED_CF2 = Bytes.toBytes("joined2"); + public static final byte[] FILTER_COLUMN = Bytes.toBytes("filter"); + public static final byte[] VALUE_COLUMN = Bytes.toBytes("val"); + public static final long ACCEPTED_VALUE = 1L; + + private static final Map columnMap = new TreeMap( + Bytes.BYTES_COMPARATOR); + + private final AtomicLong expectedNumberOfKeys = new AtomicLong(0); + private final AtomicLong totalNumberOfKeys = new AtomicLong(0); + + public DataGenerator() { + super(MIN_DATA_SIZE, MAX_DATA_SIZE); + columnMap.put(ESSENTIAL_CF, new byte[][] { FILTER_COLUMN }); + columnMap.put(JOINED_CF1, new byte[][] { FILTER_COLUMN, VALUE_COLUMN }); + columnMap.put(JOINED_CF2, new byte[][] { VALUE_COLUMN }); + } + + public long getExpectedNumberOfKeys() { + return expectedNumberOfKeys.get(); + } + + public long getTotalNumberOfKeys() { + return totalNumberOfKeys.get(); + } + + @Override + public byte[] getDeterministicUniqueKey(long keyBase) { + return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes(); + } + + @Override + public byte[][] getColumnFamilies() { + return columnMap.keySet().toArray(new byte[columnMap.size()][]); + } + + @Override + public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { + return columnMap.get(cf); + } + + @Override + public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { + if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) { + // Random deterministic way to make some values "on" and others "off" for filters. + long value = Long.parseLong(Bytes.toString(rowKey, 0, 4), 16) & ACCEPTED_VALUE; + if (Bytes.BYTES_COMPARATOR.compare(cf, ESSENTIAL_CF) == 0) { + totalNumberOfKeys.incrementAndGet(); + if (value == ACCEPTED_VALUE) { + expectedNumberOfKeys.incrementAndGet(); + } + } + return Bytes.toBytes(value); + } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) { + return kvGenerator.generateRandomSizeValue(rowKey, cf, column); + } + String error = "Unknown column " + Bytes.toString(column); + assert false : error; + throw new InvalidParameterException(error); + } + + @Override + public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { + if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) { + // Relies on the filter from getScanFilter being used. + return Bytes.toLong(value) == ACCEPTED_VALUE; + } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) { + return LoadTestKVGenerator.verify(value, rowKey, cf, column); + } + return false; // some bogus value from read, we don't expect any such thing. + } + + @Override + public boolean verify(byte[] rowKey, byte[] cf, Set columnSet) { + return columnMap.get(cf).length == columnSet.size(); + } + + public Filter getScanFilter() { + SingleColumnValueFilter scf = new SingleColumnValueFilter(ESSENTIAL_CF, FILTER_COLUMN, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(ACCEPTED_VALUE)); + scf.setFilterIfMissing(true); + return scf; + } + }; + + @Before + public void setUp() throws Exception { + LOG.info("Initializing cluster with " + NUM_SERVERS + " servers"); + util.initializeCluster(NUM_SERVERS); + LOG.info("Done initializing cluster"); + createTable(); + } + + private void createTable() throws Exception { + deleteTable(); + LOG.info("Creating table"); + HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes(TABLE_NAME)); + for (byte[] cf : dataGen.getColumnFamilies()) { + htd.addFamily(new HColumnDescriptor(cf)); + } + int serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); + byte[][] splits = new RegionSplitter.HexStringSplit().split(serverCount * REGIONS_PER_SERVER); + util.getHBaseAdmin().createTable(htd, splits); + LOG.info("Created table"); + } + + private void deleteTable() throws Exception { + if (util.getHBaseAdmin().tableExists(TABLE_NAME)) { + LOG.info("Deleting table"); + if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) { + util.getHBaseAdmin().disableTable(TABLE_NAME); + } + util.getHBaseAdmin().deleteTable(TABLE_NAME); + LOG.info("Deleted table"); + } + } + + @After + public void tearDown() throws Exception { + deleteTable(); + LOG.info("Restoring the cluster"); + util.restoreCluster(); + LOG.info("Done restoring the cluster"); + } + + @Test + public void testReadersAndWriters() throws Exception { + Configuration conf = util.getConfiguration(); + String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName()); + long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES); + long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); + long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER; + HTable table = new HTable(conf, Bytes.toBytes(TABLE_NAME)); + + // Create multi-threaded writer and start it. We write multiple columns/CFs and verify + // their integrity, therefore multi-put is necessary. + MultiThreadedWriter writer = + new MultiThreadedWriter(dataGen, conf, Bytes.toBytes(TABLE_NAME)); + writer.setMultiPut(true); + + LOG.info("Starting writer; the number of keys to write is " + keysToWrite); + writer.start(1, keysToWrite, WRITER_THREADS); + + // Now, do scans. + long now = EnvironmentEdgeManager.currentTimeMillis(); + long timeLimit = now + (maxRuntime * 60000); + boolean isWriterDone = false; + while (now < timeLimit && !isWriterDone) { + LOG.info("Starting the scan; wrote approximately " + + dataGen.getTotalNumberOfKeys() + " keys"); + isWriterDone = writer.isDone(); + if (isWriterDone) { + LOG.info("Scanning full result, writer is done"); + } + Scan scan = new Scan(); + for (byte[] cf : dataGen.getColumnFamilies()) { + scan.addFamily(cf); + } + scan.setFilter(dataGen.getScanFilter()); + scan.setLoadColumnFamiliesOnDemand(true); + // The number of keys we can expect from scan - lower bound (before scan). + // Not a strict lower bound - writer knows nothing about filters, so we report + // this from generator. Writer might have generated the value but not put it yet. + long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys(); + long startTs = EnvironmentEdgeManager.currentTimeMillis(); + ResultScanner results = table.getScanner(scan); + long resultCount = 0; + Result result = null; + // Verify and count the results. + while ((result = results.next()) != null) { + boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true); + Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk); + ++resultCount; + } + long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs; + // Verify the result count. + long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys(); + Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan + + " were generated ", onesGennedAfterScan >= resultCount); + if (isWriterDone) { + Assert.assertTrue("Read " + resultCount + " keys; the writer is done and " + + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount); + } else if (onesGennedBeforeScan * 0.9 > resultCount) { + LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan + + ") - there might be a problem, or the writer might just be slow"); + } + LOG.info("Scan took " + timeTaken + "ms"); + if (!isWriterDone) { + Thread.sleep(WAIT_BETWEEN_SCANS_MS); + now = EnvironmentEdgeManager.currentTimeMillis(); + } + } + Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures()); + Assert.assertTrue("Writer is not done", isWriterDone); + // Assert.fail("Boom!"); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 313ceb9dbaa..ea39620eaef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3641,7 +3641,7 @@ public class HRegion implements HeapSize { // , Writable{ // First, check if we are at a stop row. If so, there are no more results. if (stopRow) { if (filter != null && filter.hasFilterRow()) { - filter.filterRow(results); + filter.filterRow(results); } return false; } @@ -3670,7 +3670,7 @@ public class HRegion implements HeapSize { // , Writable{ final boolean isEmptyRow = results.isEmpty(); // We have the part of the row necessary for filtering (all of it, usually). - // First filter with the filterRow(List). + // First filter with the filterRow(List). if (filter != null && filter.hasFilterRow()) { filter.filterRow(results); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java index 7d81a60df5e..c5c1cd99a02 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java @@ -129,13 +129,12 @@ public class TestEncodedSeekers { private void doPuts(HRegion region) throws IOException{ LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(MIN_VALUE_SIZE, MAX_VALUE_SIZE); for (int i = 0; i < NUM_ROWS; ++i) { - byte[] key = MultiThreadedWriter.longToByteArrayKey(i); + byte[] key = LoadTestKVGenerator.md5PrefixedKey(i).getBytes(); for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { Put put = new Put(key); - String colAsStr = String.valueOf(j); - byte[] col = Bytes.toBytes(colAsStr); - byte[] value = dataGenerator.generateRandomSizeValue(i, colAsStr); - put.add(CF_BYTES, Bytes.toBytes(colAsStr), value); + byte[] col = Bytes.toBytes(String.valueOf(j)); + byte[] value = dataGenerator.generateRandomSizeValue(key, col); + put.add(CF_BYTES, col, value); if(VERBOSE){ KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value); System.err.println(Strings.padFront(i+"", ' ', 4)+" "+kvPut); @@ -151,7 +150,7 @@ public class TestEncodedSeekers { private void doGets(HRegion region) throws IOException{ for (int i = 0; i < NUM_ROWS; ++i) { - final byte[] rowKey = MultiThreadedWriter.longToByteArrayKey(i); + final byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes(); for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { final String qualStr = String.valueOf(j); if (VERBOSE) { @@ -163,8 +162,8 @@ public class TestEncodedSeekers { get.addColumn(CF_BYTES, qualBytes); Result result = region.get(get, null); assertEquals(1, result.size()); - assertTrue(LoadTestKVGenerator.verify(Bytes.toString(rowKey), qualStr, - result.getValue(CF_BYTES, qualBytes))); + byte[] value = result.getValue(CF_BYTES, qualBytes); + assertTrue(LoadTestKVGenerator.verify(value, rowKey, qualBytes)); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 744cd09a808..ca2f5860c74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; /** * A command-line utility that reads, writes, and verifies data. Unlike @@ -119,7 +121,7 @@ public class LoadTestTool extends AbstractHBaseTool { // Writer options private int numWriterThreads = DEFAULT_NUM_THREADS; - private long minColsPerKey, maxColsPerKey; + private int minColsPerKey, maxColsPerKey; private int minColDataSize, maxColDataSize; private boolean isMultiPut; @@ -260,7 +262,7 @@ public class LoadTestTool extends AbstractHBaseTool { int colIndex = 0; minColsPerKey = 1; - maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]); + maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]); int avgColDataSize = parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); minColDataSize = avgColDataSize / 2; @@ -342,16 +344,16 @@ public class LoadTestTool extends AbstractHBaseTool { initTestTable(); } + LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator( + minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY); + if (isWrite) { - writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY); + writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); writerThreads.setMultiPut(isMultiPut); - writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey); - writerThreads.setDataSize(minColDataSize, maxColDataSize); } if (isRead) { - readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY, - verifyPercent); + readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index b312cca1d84..c988eca2e97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -18,12 +18,19 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.util.Collection; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; import org.apache.hadoop.util.StringUtils; /** @@ -34,7 +41,6 @@ public abstract class MultiThreadedAction { private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class); protected final byte[] tableName; - protected final byte[] columnFamily; protected final Configuration conf; protected int numThreads = 1; @@ -51,8 +57,69 @@ public abstract class MultiThreadedAction { protected AtomicLong totalOpTimeMs = new AtomicLong(); protected boolean verbose = false; - protected int minDataSize = 256; - protected int maxDataSize = 1024; + protected LoadTestDataGenerator dataGenerator = null; + + /** + * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed + * set of column families, and random number of columns in range. The table for it can + * be created manually or, for example, via + * {@link HBaseTestingUtility#createPreSplitLoadTestTable( + * org.apache.hadoop.hbase.Configuration, byte[], byte[], Algorithm, DataBlockEncoding)} + */ + public static class DefaultDataGenerator extends LoadTestDataGenerator { + private byte[][] columnFamilies = null; + private int minColumnsPerKey; + private int maxColumnsPerKey; + private final Random random = new Random(); + + public DefaultDataGenerator(int minValueSize, int maxValueSize, + int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) { + super(minValueSize, maxValueSize); + this.columnFamilies = columnFamilies; + this.minColumnsPerKey = minColumnsPerKey; + this.maxColumnsPerKey = maxColumnsPerKey; + } + + public DefaultDataGenerator(byte[]... columnFamilies) { + // Default values for tests that didn't care to provide theirs. + this(256, 1024, 1, 10, columnFamilies); + } + + @Override + public byte[] getDeterministicUniqueKey(long keyBase) { + return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes(); + } + + @Override + public byte[][] getColumnFamilies() { + return columnFamilies; + } + + @Override + public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { + int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1); + byte[][] columns = new byte[numColumns][]; + for (int i = 0; i < numColumns; ++i) { + columns[i] = Integer.toString(i).getBytes(); + } + return columns; + } + + @Override + public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { + return kvGenerator.generateRandomSizeValue(rowKey, cf, column); + } + + @Override + public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { + return LoadTestKVGenerator.verify(value, rowKey, cf, column); + } + + @Override + public boolean verify(byte[] rowKey, byte[] cf, Set columnSet) { + return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey); + } + } /** "R" or "W" */ private String actionLetter; @@ -62,11 +129,11 @@ public abstract class MultiThreadedAction { public static final int REPORTING_INTERVAL_MS = 5000; - public MultiThreadedAction(Configuration conf, byte[] tableName, - byte[] columnFamily, String actionLetter) { + public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, byte[] tableName, + String actionLetter) { this.conf = conf; + this.dataGenerator = dataGen; this.tableName = tableName; - this.columnFamily = columnFamily; this.actionLetter = actionLetter; } @@ -165,17 +232,16 @@ public abstract class MultiThreadedAction { } } - public void setDataSize(int minDataSize, int maxDataSize) { - this.minDataSize = minDataSize; - this.maxDataSize = maxDataSize; - } - public void waitForFinish() { while (numThreadsWorking.get() != 0) { Threads.sleepWithoutInterrupt(1000); } } + public boolean isDone() { + return (numThreadsWorking.get() == 0); + } + protected void startThreads(Collection threads) { numThreadsWorking.addAndGet(threads.size()); for (Thread thread : threads) { @@ -202,4 +268,77 @@ public abstract class MultiThreadedAction { sb.append(v); } + /** + * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. + * Does not verify cf/column integrity. + */ + public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) { + return verifyResultAgainstDataGenerator(result, verifyValues, false); + } + + /** + * Verifies the result from get or scan using the dataGenerator (that was presumably + * also used to generate said result). + * @param verifyValues verify that values in the result make sense for row/cf/column combination + * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note + * that to use this multiPut should be used, or verification + * has to happen after writes, otherwise there can be races. + * @return + */ + public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues, + boolean verifyCfAndColumnIntegrity) { + String rowKeyStr = Bytes.toString(result.getRow()); + + // See if we have any data at all. + if (result.isEmpty()) { + LOG.error("No data returned for key = [" + rowKeyStr + "]"); + return false; + } + + if (!verifyValues && !verifyCfAndColumnIntegrity) { + return true; // as long as we have something, we are good. + } + + // See if we have all the CFs. + byte[][] expectedCfs = dataGenerator.getColumnFamilies(); + if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) { + LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size()); + return false; + } + + // Verify each column family from get in the result. + for (byte[] cf : result.getMap().keySet()) { + String cfStr = Bytes.toString(cf); + Map columnValues = result.getFamilyMap(cf); + if (columnValues == null) { + LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]"); + return false; + } + // See if we have correct columns. + if (verifyCfAndColumnIntegrity + && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) { + String colsStr = ""; + for (byte[] col : columnValues.keySet()) { + if (colsStr.length() > 0) { + colsStr += ", "; + } + colsStr += "[" + Bytes.toString(col) + "]"; + } + LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr); + return false; + } + // See if values check out. + if (verifyValues) { + for (Map.Entry kv : columnValues.entrySet()) { + if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) { + LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " + + + kv.getValue().length); + return false; + } + } + } + } + return true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index f585dbc5777..eb0c268b0af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; /** Creates multiple threads that read and verify previously written data */ @@ -72,9 +74,9 @@ public class MultiThreadedReader extends MultiThreadedAction private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; - public MultiThreadedReader(Configuration conf, byte[] tableName, - byte[] columnFamily, double verifyPercent) { - super(conf, tableName, columnFamily, "R"); + public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, + byte[] tableName, double verifyPercent) { + super(dataGen, conf, tableName, "R"); this.verifyPercent = verifyPercent; } @@ -223,14 +225,22 @@ public class MultiThreadedReader extends MultiThreadedAction } private Get readKey(long keyToRead) { - Get get = new Get( - LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes()); - get.addFamily(columnFamily); + Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); + String cfsString = ""; + byte[][] columnFamilies = dataGenerator.getColumnFamilies(); + for (byte[] cf : columnFamilies) { + get.addFamily(cf); + if (verbose) { + if (cfsString.length() > 0) { + cfsString += ", "; + } + cfsString += "[" + Bytes.toStringBinary(cf) + "]"; + } + } try { if (verbose) { - LOG.info("[" + readerId + "] " + "Querying key " + keyToRead - + ", cf " + Bytes.toStringBinary(columnFamily)); + LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } queryKey(get, random.nextInt(100) < verifyPercent); } catch (IOException e) { @@ -250,47 +260,38 @@ public class MultiThreadedReader extends MultiThreadedAction Result result = table.get(get); totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); numKeys.addAndGet(1); - - // if we got no data report error - if (result.isEmpty()) { + if (!result.isEmpty()) { + if (verify) { + numKeysVerified.incrementAndGet(); + } + } else { HRegionLocation hloc = table.getRegionLocation( Bytes.toBytes(rowKey)); LOG.info("Key = " + rowKey + ", RegionServer: " + hloc.getHostname()); - numReadErrors.addAndGet(1); - LOG.error("No data returned, tried to get actions for key = " - + rowKey + (writer == null ? "" : ", keys inserted by writer: " + - writer.numKeys.get() + ")")); - - if (numReadErrors.get() > maxErrors) { - LOG.error("Aborting readers -- found more than " + maxErrors - + " errors\n"); - aborted = true; - } } - if (result.getFamilyMap(columnFamily) != null) { - // increment number of columns read - numCols.addAndGet(result.getFamilyMap(columnFamily).size()); - - if (verify) { - // verify the result - List keyValues = result.list(); - for (KeyValue kv : keyValues) { - String qual = new String(kv.getQualifier()); - - // if something does not look right report it - if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) { - numReadErrors.addAndGet(1); - LOG.error("Error checking data for key = " + rowKey - + ", actionId = " + qual); - } - } - numKeysVerified.addAndGet(1); + boolean isOk = verifyResultAgainstDataGenerator(result, verify); + long numErrorsAfterThis = 0; + if (isOk) { + long cols = 0; + // Count the columns for reporting purposes. + for (byte[] cf : result.getMap().keySet()) { + cols += result.getFamilyMap(cf).size(); } + numCols.addAndGet(cols); + } else { + if (writer != null) { + LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys"); + } + numErrorsAfterThis = numReadErrors.incrementAndGet(); + } + + if (numErrorsAfterThis > maxErrors) { + LOG.error("Aborting readers -- found more than " + maxErrors + " errors"); + aborted = true; } } - } public long getNumReadFailures() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java index e73bc57be8f..d6aca2c455e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.util.HashSet; +import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; -import java.util.Random; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -33,14 +33,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; /** Creates multiple threads that write key/values into the */ public class MultiThreadedWriter extends MultiThreadedAction { private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); - private long minColumnsPerKey = 1; - private long maxColumnsPerKey = 10; private Set writers = new HashSet(); private boolean isMultiPut = false; @@ -51,8 +50,7 @@ public class MultiThreadedWriter extends MultiThreadedAction { * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys * being inserted. This queue is supposed to stay small. */ - private BlockingQueue insertedKeys = - new ArrayBlockingQueue(10000); + private BlockingQueue insertedKeys = new ArrayBlockingQueue(10000); /** * This is the current key to be inserted by any thread. Each thread does an @@ -78,9 +76,9 @@ public class MultiThreadedWriter extends MultiThreadedAction { /** Enable this if used in conjunction with a concurrent reader. */ private boolean trackInsertedKeys; - public MultiThreadedWriter(Configuration conf, byte[] tableName, - byte[] columnFamily) { - super(conf, tableName, columnFamily, "W"); + public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, + byte[] tableName) { + super(dataGen, conf, tableName, "W"); } /** Use multi-puts vs. separate puts for every column in a row */ @@ -88,11 +86,6 @@ public class MultiThreadedWriter extends MultiThreadedAction { this.isMultiPut = isMultiPut; } - public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) { - this.minColumnsPerKey = minColumnsPerKey; - this.maxColumnsPerKey = maxColumnsPerKey; - } - @Override public void start(long startKey, long endKey, int numThreads) throws IOException { @@ -118,17 +111,9 @@ public class MultiThreadedWriter extends MultiThreadedAction { startThreads(writers); } - public static byte[] longToByteArrayKey(long rowKey) { - return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); - } - private class HBaseWriterThread extends Thread { private final HTable table; - private final Random random = new Random(); - private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator( - minDataSize, maxDataSize); - public HBaseWriterThread(int writerId) throws IOException { setName(getClass().getSimpleName() + "_" + writerId); table = new HTable(conf, tableName); @@ -136,20 +121,36 @@ public class MultiThreadedWriter extends MultiThreadedAction { public void run() { try { - long rowKey; - while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) { - long numColumns = minColumnsPerKey + Math.abs(random.nextLong()) - % (maxColumnsPerKey - minColumnsPerKey); + long rowKeyBase; + byte[][] columnFamilies = dataGenerator.getColumnFamilies(); + while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) { + byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); + Put put = new Put(rowKey); numKeys.addAndGet(1); - if (isMultiPut) { - multiPutInsertKey(rowKey, 0, numColumns); - } else { - for (long col = 0; col < numColumns; ++col) { - insert(rowKey, col); + int columnCount = 0; + for (byte[] cf : columnFamilies) { + String s; + byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf); + for (byte[] column : columns) { + byte[] value = dataGenerator.generateValue(rowKey, cf, column); + put.add(cf, column, value); + ++columnCount; + if (!isMultiPut) { + insert(put, rowKeyBase); + numCols.addAndGet(1); + put = new Put(rowKey); + } } } + if (isMultiPut) { + if (verbose) { + LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns"); + } + insert(put, rowKeyBase); + numCols.addAndGet(columnCount); + } if (trackInsertedKeys) { - insertedKeys.add(rowKey); + insertedKeys.add(rowKeyBase); } } } finally { @@ -162,52 +163,14 @@ public class MultiThreadedWriter extends MultiThreadedAction { } } - public void insert(long rowKey, long col) { - Put put = new Put(longToByteArrayKey(rowKey)); - String colAsStr = String.valueOf(col); - put.add(columnFamily, Bytes.toBytes(colAsStr), - dataGenerator.generateRandomSizeValue(rowKey, colAsStr)); + public void insert(Put put, long keyBase) { try { long start = System.currentTimeMillis(); table.put(put); - numCols.addAndGet(1); totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); } catch (IOException e) { - failedKeySet.add(rowKey); - LOG.error("Failed to insert: " + rowKey); - e.printStackTrace(); - } - } - - public void multiPutInsertKey(long rowKey, long startCol, long endCol) { - if (verbose) { - LOG.debug("Preparing put for key = " + rowKey + ", cols = [" - + startCol + ", " + endCol + ")"); - } - - if (startCol >= endCol) { - return; - } - - Put put = new Put(LoadTestKVGenerator.md5PrefixedKey( - rowKey).getBytes()); - byte[] columnQualifier; - byte[] value; - for (long i = startCol; i < endCol; ++i) { - String qualStr = String.valueOf(i); - columnQualifier = qualStr.getBytes(); - value = dataGenerator.generateRandomSizeValue(rowKey, qualStr); - put.add(columnFamily, columnQualifier, value); - } - - try { - long start = System.currentTimeMillis(); - table.put(put); - numCols.addAndGet(endCol - startCol); - totalOpTimeMs.addAndGet( - System.currentTimeMillis() - start); - } catch (IOException e) { - failedKeySet.add(rowKey); + failedKeySet.add(keyBase); + LOG.error("Failed to insert: " + keyBase); e.printStackTrace(); } } @@ -302,8 +265,7 @@ public class MultiThreadedWriter extends MultiThreadedAction { * key, which requires a blocking queue and a consumer thread. * @param enable whether to enable tracking the last inserted key */ - void setTrackInsertedKeys(boolean enable) { + public void setTrackInsertedKeys(boolean enable) { trackInsertedKeys = enable; } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java index 8f02f0b2c33..421db050adb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; /** * A command-line tool that spins up a local process-based cluster, loads @@ -59,8 +60,8 @@ public class RestartMetaTest extends AbstractHBaseTool { private void loadData() throws IOException { long startKey = 0; long endKey = 100000; - long minColsPerKey = 5; - long maxColsPerKey = 15; + int minColsPerKey = 5; + int maxColsPerKey = 15; int minColDataSize = 256; int maxColDataSize = 256 * 3; int numThreads = 10; @@ -74,11 +75,10 @@ public class RestartMetaTest extends AbstractHBaseTool { System.out.printf("Client Threads: %d\n", numThreads); // start the writers - MultiThreadedWriter writer = new MultiThreadedWriter(conf, TABLE_NAME, - LoadTestTool.COLUMN_FAMILY); + LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator( + minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY); + MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); writer.setMultiPut(true); - writer.setColumnsPerKey(minColsPerKey, maxColsPerKey); - writer.setDataSize(minColDataSize, maxColDataSize); writer.start(startKey, endKey, numThreads); System.out.printf("Started loading data..."); writer.waitForFinish(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java index f57dcbe37d6..4d37b2e4d28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -139,9 +140,10 @@ public class TestMiniClusterLoadSequential { TEST_UTIL.waitUntilAllRegionsAssigned(numRegions); - writerThreads = new MultiThreadedWriter(conf, TABLE, CF); + LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); + writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE); writerThreads.setMultiPut(isMultiPut); - readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100); + readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100); } protected int numKeys() {