HBASE-9858 Integration test and LoadTestTool support for cell Visibility

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1555145 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2014-01-03 16:44:59 +00:00
parent 57c8d90ac4
commit e47fa6826b
18 changed files with 471 additions and 130 deletions

View File

@ -19,9 +19,10 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -32,12 +33,15 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Sets;
/**
* A base class for tests that do something with the cluster while running
* {@link LoadTestTool} to write and verify some data.
*/
@Category(IntegrationTests.class)
public class IntegrationTestIngest extends IntegrationTestBase {
public static final char HIPHEN = '-';
private static final int SERVER_COUNT = 4; // number of slaves for the smallest cluster
private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
private static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
@ -78,11 +82,11 @@ public class IntegrationTestIngest extends IntegrationTestBase {
@Test
public void testIngest() throws Exception {
runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, false, 10);
runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10);
}
private void internalRunIngestTest(long runTime) throws Exception {
runIngestTest(runTime, 2500, 10, 1024, 10, false, 10);
runIngestTest(runTime, 2500, 10, 1024, 10);
}
@Override
@ -101,8 +105,8 @@ public class IntegrationTestIngest extends IntegrationTestBase {
}
}
protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter,
int colsPerKey, int recordSize, int writeThreads, boolean useTags, int maxTagsPerKey) throws Exception {
protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, int colsPerKey,
int recordSize, int writeThreads) throws Exception {
LOG.info("Running ingest");
LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
@ -117,45 +121,23 @@ public class IntegrationTestIngest extends IntegrationTestBase {
((runtime - (System.currentTimeMillis() - start))/60000) + " min");
int ret = -1;
if (useTags) {
ret = loadTool.run(new String[] { "-tn", getTablename(), "-write",
String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), "-start_key",
String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init",
"-usetags", "-num_tags", String.format("1:%d", maxTagsPerKey) });
} else {
ret = loadTool.run(new String[] { "-tn", getTablename(), "-write",
String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), "-start_key",
String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init" });
}
ret = loadTool.run(getArgsForLoadTestTool("-write",
String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
if (0 != ret) {
String errorMsg = "Load failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
if (useTags) {
ret = loadTool.run(new String[] { "-tn", getTablename(), "-update",
String.format("60:%d", writeThreads), "-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys), "-skip_init", "-usetags", "-num_tags",
String.format("1:%d", maxTagsPerKey) });
} else {
ret = loadTool.run(new String[] { "-tn", getTablename(), "-update",
String.format("60:%d", writeThreads), "-start_key", String.valueOf(startKey),
"-num_keys", String.valueOf(numKeys), "-skip_init" });
}
ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d", writeThreads),
startKey, numKeys));
if (0 != ret) {
String errorMsg = "Update failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
if (useTags) {
ret = loadTool.run(new String[] { "-tn", getTablename(), "-read", "100:20", "-start_key",
String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init",
"-usetags", "-num_tags", String.format("1:%d", maxTagsPerKey) });
} else {
ret = loadTool.run(new String[] { "-tn", getTablename(), "-read", "100:20", "-start_key",
String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init" });
}
ret = loadTool.run(getArgsForLoadTestTool("-read", "100:20", startKey, numKeys));
if (0 != ret) {
String errorMsg = "Verification failed with error code " + ret;
LOG.error(errorMsg);
@ -165,6 +147,21 @@ public class IntegrationTestIngest extends IntegrationTestBase {
}
}
protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
long numKeys) {
List<String> args = new ArrayList<String>();
args.add("-tn");
args.add(getTablename());
args.add(mode);
args.add(modeSpecificArg);
args.add("-start_key");
args.add(String.valueOf(startKey));
args.add("-num_keys");
args.add(String.valueOf(numKeys));
args.add("-skip_init");
return args.toArray(new String[args.size()]);
}
/** Estimates a data size based on the cluster size */
private long getNumKeys(int keysPerServer)
throws IOException {

View File

@ -17,20 +17,46 @@
*/
package org.apache.hadoop.hbase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.LoadTestDataGeneratorWithTags;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.junit.experimental.categories.Category;
@Category(IntegrationTests.class)
public class IntegrationTestIngestWithTags extends IntegrationTestIngest {
private static final char COLON = ':';
private int minTagsPerKey = 1, maxTagsPerKey = 10;
private int minTagLength = 16, maxTagLength = 512;
@Override
public void setUpCluster() throws Exception {
getTestingUtil(conf).getConfiguration().setInt("hfile.format.version", 3);
getTestingUtil(conf).getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
super.setUpCluster();
}
@Override
protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, int colsPerKey,
int recordSize, int writeThreads, boolean useTags, int maxTagsPerKey) throws Exception {
super.runIngestTest(defaultRunTime, keysPerServerPerIter, colsPerKey, recordSize, writeThreads,
true, 10);
protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
long numKeys) {
String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys);
List<String> tmp = new ArrayList<String>(Arrays.asList(args));
// LoadTestDataGeneratorWithTags:minNumTags:maxNumTags:minTagLength:maxTagLength
tmp.add(HIPHEN + LoadTestTool.OPT_GENERATOR);
StringBuilder sb = new StringBuilder(LoadTestDataGeneratorWithTags.class.getName());
sb.append(COLON);
sb.append(minTagsPerKey);
sb.append(COLON);
sb.append(maxTagsPerKey);
sb.append(COLON);
sb.append(minTagLength);
sb.append(COLON);
sb.append(maxTagLength);
tmp.add(sb.toString());
return tmp.toArray(new String[tmp.size()]);
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.LoadTestDataGeneratorWithVisibilityLabels;
import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
import org.apache.hadoop.hbase.security.visibility.VisibilityController;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.junit.experimental.categories.Category;
@Category(IntegrationTests.class)
public class IntegrationTestIngestWithVisibilityLabels extends IntegrationTestIngest {
private static final char COMMA = ',';
private static final char COLON = ':';
private static final String[] LABELS = { "secret", "topsecret", "confidential", "public",
"private" };
private static final String[] VISIBILITY_EXPS = { "secret & confidential & !private",
"topsecret | confidential", "confidential & private", "public", "topsecret & private",
"!public | private", "(secret | topsecret) & private" };
private static final List<List<String>> AUTHS = new ArrayList<List<String>>();
static {
ArrayList<String> tmp = new ArrayList<String>();
tmp.add("secret");
tmp.add("confidential");
AUTHS.add(tmp);
tmp = new ArrayList<String>();
tmp.add("topsecret");
AUTHS.add(tmp);
tmp = new ArrayList<String>();
tmp.add("confidential");
tmp.add("private");
AUTHS.add(tmp);
tmp = new ArrayList<String>();
tmp.add("public");
AUTHS.add(tmp);
tmp = new ArrayList<String>();
tmp.add("topsecret");
tmp.add("private");
AUTHS.add(tmp);
tmp = new ArrayList<String>();
tmp.add("confidential");
AUTHS.add(tmp);
tmp = new ArrayList<String>();
tmp.add("topsecret");
tmp.add("private");
AUTHS.add(tmp);
}
@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(null);
Configuration conf = util.getConfiguration();
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());
conf.set("hbase.superuser", "admin," + User.getCurrent().getName());
super.setUpCluster();
addLabels();
}
@Override
protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
long numKeys) {
String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys);
List<String> tmp = new ArrayList<String>(Arrays.asList(args));
tmp.add(HIPHEN + LoadTestTool.OPT_GENERATOR);
StringBuilder sb = new StringBuilder(LoadTestDataGeneratorWithVisibilityLabels.class.getName());
sb.append(COLON);
sb.append(asCommaSeperatedString(VISIBILITY_EXPS));
sb.append(COLON);
String authorizationsStr = AUTHS.toString();
sb.append(authorizationsStr.substring(1, authorizationsStr.length() - 1));
tmp.add(sb.toString());
return tmp.toArray(new String[tmp.size()]);
}
private static String asCommaSeperatedString(String[] list) {
StringBuilder sb = new StringBuilder();
for (String item : list) {
sb.append(item);
sb.append(COMMA);
}
if (sb.length() > 0) {
// Remove the trailing ,
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
private void addLabels() throws Exception {
try {
VisibilityClient.addLabels(util.getConfiguration(), LABELS);
VisibilityClient.setAuths(util.getConfiguration(), LABELS, User.getCurrent().getName());
} catch (Throwable t) {
throw new IOException(t);
}
}
}

View File

@ -232,7 +232,7 @@ public class IntegrationTestLazyCfLoading {
LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
// TODO : Need to see if tag support has to be given here in the integration test suite
writer.start(1, keysToWrite, WRITER_THREADS, false, 0, 0);
writer.start(1, keysToWrite, WRITER_THREADS);
// Now, do scans.
long now = EnvironmentEdgeManager.currentTimeMillis();

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.MultiThreadedAction;
import org.apache.hadoop.hbase.util.MultiThreadedReader;
import org.apache.hadoop.hbase.util.MultiThreadedWriter;
@ -204,7 +203,7 @@ public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
if (preloadKeys > 0) {
MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn);
long time = System.currentTimeMillis();
preloader.start(0, startKey, writeThreads, false, 0, 0);
preloader.start(0, startKey, writeThreads);
preloader.waitForFinish();
if (preloader.getNumWriteFailures() > 0) {
throw new IOException("Preload failed");
@ -221,8 +220,8 @@ public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
reader.linkToWriter(writer);
long testStartTime = System.currentTimeMillis();
writer.start(startKey, endKey, writeThreads, false, 0, 0);
reader.start(startKey, endKey, readThreads, /* rdmReadThreads, Long.MAX_VALUE, */ false, 0, 0);
writer.start(startKey, endKey, writeThreads);
reader.start(startKey, endKey, readThreads);
writer.waitForFinish();
reader.waitForFinish();
// reader.waitForVerification(300000);

View File

@ -46,10 +46,12 @@ class VisibilityLabelFilter extends FilterBase {
public ReturnCode filterKeyValue(Cell cell) throws IOException {
Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
boolean visibilityTagPresent = false;
while (tagsItr.hasNext()) {
boolean includeKV = true;
Tag tag = tagsItr.next();
if (tag.getType() == VisibilityUtils.VISIBILITY_TAG_TYPE) {
visibilityTagPresent = true;
int offset = tag.getTagOffset();
int endOffset = offset + tag.getTagLength();
while (offset < endOffset) {
@ -76,9 +78,8 @@ class VisibilityLabelFilter extends FilterBase {
// the result then.
return ReturnCode.INCLUDE;
}
return ReturnCode.SKIP;
}
}
return ReturnCode.INCLUDE;
return visibilityTagPresent ? ReturnCode.SKIP : ReturnCode.INCLUDE;
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.security.visibility;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.MultiThreadedAction.DefaultDataGenerator;
@InterfaceAudience.Private
public class LoadTestDataGeneratorWithVisibilityLabels extends DefaultDataGenerator {
private static final String COMMA = ",";
private String[] visibilityExps = null;
private String[][] authorizations = null;
public LoadTestDataGeneratorWithVisibilityLabels(int minValueSize, int maxValueSize,
int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
super(minValueSize, maxValueSize, minColumnsPerKey, maxColumnsPerKey, columnFamilies);
}
@Override
public void initialize(String[] args) {
if (args.length < 1 || args.length > 2) {
throw new IllegalArgumentException("LoadTestDataGeneratorWithVisibilityLabels can have "
+ "1 or 2 initialization arguments");
}
// 1st arg in args is supposed to be the visibilityExps to be used with Mutations.
String temp = args[0];
// This will be comma separated list of expressions.
this.visibilityExps = temp.split(COMMA);
// 2nd arg in args,if present, is supposed to be comma separated set of authorizations to be
// used with Gets. Each of the set will be comma separated within square brackets.
// Eg: [secret,private],[confidential,private],[public]
if (args.length == 2) {
this.authorizations = toAuthorizationsSet(args[1]);
}
}
private static String[][] toAuthorizationsSet(String authorizationsStr) {
// Eg: [secret,private],[confidential,private],[public]
String[] split = authorizationsStr.split("],");
String[][] result = new String[split.length][];
for (int i = 0; i < split.length; i++) {
String s = split[i].trim();
assert s.charAt(0) == '[';
s = s.substring(1);
if (i == split.length - 1) {
assert s.charAt(s.length() - 1) == ']';
s = s.substring(0, s.length() - 1);
}
String[] tmp = s.split(COMMA);
for (int j = 0; j < tmp.length; j++) {
tmp[j] = tmp[j].trim();
}
result[i] = tmp;
}
return result;
}
@Override
public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException {
if (!(m instanceof Delete)) {
m.setCellVisibility(new CellVisibility(this.visibilityExps[(int) rowkeyBase
% this.visibilityExps.length]));
}
return m;
}
@Override
public Get beforeGet(long rowkeyBase, Get get) {
get.setAuthorizations(new Authorizations(
authorizations[(int) (rowkeyBase % authorizations.length)]));
return get;
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.MultiThreadedAction.DefaultDataGenerator;
@InterfaceAudience.Private
public class LoadTestDataGeneratorWithTags extends DefaultDataGenerator {
private int minNumTags, maxNumTags;
private int minTagLength, maxTagLength;
private Random random = new Random();
public LoadTestDataGeneratorWithTags(int minValueSize, int maxValueSize, int minColumnsPerKey,
int maxColumnsPerKey, byte[]... columnFamilies) {
super(minValueSize, maxValueSize, minColumnsPerKey, maxColumnsPerKey, columnFamilies);
}
@Override
public void initialize(String[] args) {
if (args.length != 4) {
throw new IllegalArgumentException("LoadTestDataGeneratorWithTags must have "
+ "4 initialization arguments. ie. minNumTags:maxNumTags:minTagLength:maxTagLength");
}
// 1st arg in args is the min number of tags to be used with every cell
this.minNumTags = Integer.parseInt(args[0]);
// 2nd arg in args is the max number of tags to be used with every cell
this.maxNumTags = Integer.parseInt(args[1]);
// 3rd arg in args is the min tag length
this.minTagLength = Integer.parseInt(args[2]);
// 4th arg in args is the max tag length
this.maxTagLength = Integer.parseInt(args[3]);
}
@Override
public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException {
if (m instanceof Put) {
List<Cell> updatedCells = new ArrayList<Cell>();
int numTags;
if (minNumTags == maxNumTags) {
numTags = minNumTags;
} else {
numTags = minNumTags + random.nextInt(maxNumTags - minNumTags);
}
List<Tag> tags;
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
byte[] tag = LoadTestTool.generateData(random,
minTagLength + random.nextInt(maxTagLength - minTagLength));
tags = new ArrayList<Tag>();
for (int n = 0; n < numTags; n++) {
tags.add(new Tag((byte) 127, tag));
}
Cell updatedCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(), cell.getTimestamp(), Type.codeToType(cell.getTypeByte()),
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), tags);
updatedCells.add(updatedCell);
}
m.getFamilyCellMap().clear();
// Clear and add new Cells to the Mutation.
for (Cell cell : updatedCells) {
((Put) m).add(cell);
}
}
return m;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
@ -56,6 +57,7 @@ import org.apache.hadoop.util.ToolRunner;
public class LoadTestTool extends AbstractHBaseTool {
private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
private static final String COLON = ":";
/** Table name for the test */
private TableName tableName;
@ -107,13 +109,10 @@ public class LoadTestTool extends AbstractHBaseTool {
public static final String OPT_INMEMORY = "in_memory";
public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
"inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
public static final String OPT_USETAGS = "usetags";
public static final String OPT_USAGE_USETAG = "Adds tags with every KV. This option would be used" +
" only if the HFileV3 version is used";
public static final String OPT_NUM_TAGS = "num_tags";
public static final String OPT_USAGE_NUM_TAGS = "Specifies the minimum and number of tags to be"
+ " added per KV";
public static final String OPT_GENERATOR = "generator";
public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
+ " Any args for this class can be passed as colon separated after class name";
protected static final String OPT_KEY_WINDOW = "key_window";
protected static final String OPT_WRITE = "write";
@ -153,9 +152,6 @@ public class LoadTestTool extends AbstractHBaseTool {
protected Compression.Algorithm compressAlgo;
protected BloomType bloomType;
private boolean inMemoryCF;
private boolean useTags;
private int minNumTags = 1;
private int maxNumTags = 1;
// Writer options
protected int numWriterThreads = DEFAULT_NUM_THREADS;
protected int minColsPerKey, maxColsPerKey;
@ -185,7 +181,7 @@ public class LoadTestTool extends AbstractHBaseTool {
protected String[] splitColonSeparated(String option,
int minNumCols, int maxNumCols) {
String optVal = cmd.getOptionValue(option);
String[] cols = optVal.split(":");
String[] cols = optVal.split(COLON);
if (cols.length < minNumCols || cols.length > maxNumCols) {
throw new IllegalArgumentException("Expected at least "
+ minNumCols + " columns but no more than " + maxNumCols +
@ -269,8 +265,7 @@ public class LoadTestTool extends AbstractHBaseTool {
addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
"separate updates for every column in a row");
addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
addOptNoArg(OPT_USETAGS, OPT_USAGE_USETAG);
addOptWithArg(OPT_NUM_TAGS, OPT_USAGE_NUM_TAGS + " The default is 1:1");
addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
addOptWithArg(OPT_START_KEY, "The first key to read/write " +
@ -406,24 +401,9 @@ public class LoadTestTool extends AbstractHBaseTool {
BloomType.valueOf(bloomStr);
inMemoryCF = cmd.hasOption(OPT_INMEMORY);
useTags = cmd.hasOption(OPT_USETAGS);
if (useTags) {
if (cmd.hasOption(OPT_NUM_TAGS)) {
String[] readOpts = splitColonSeparated(OPT_NUM_TAGS, 1, 2);
int colIndex = 0;
minNumTags = parseInt(readOpts[colIndex++], 1, 100);
if (colIndex < readOpts.length) {
maxNumTags = parseInt(readOpts[colIndex++], 1, 100);
}
}
System.out.println("Using tags, number of tags per KV: min=" + minNumTags + ", max="
+ maxNumTags);
}
if (cmd.hasOption(OPT_ENCRYPTION)) {
cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
}
}
public void initTestTable() throws IOException {
@ -456,8 +436,18 @@ public class LoadTestTool extends AbstractHBaseTool {
initTestTable();
}
LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
LoadTestDataGenerator dataGen = null;
if (cmd.hasOption(OPT_GENERATOR)) {
String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
String[] args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs,
1, clazzAndArgs.length);
dataGen.initialize(args);
} else {
// Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
}
if (isWrite) {
writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
@ -489,7 +479,7 @@ public class LoadTestTool extends AbstractHBaseTool {
if (isWrite) {
System.out.println("Starting to write data...");
writerThreads.start(startKey, endKey, numWriterThreads, useTags, minNumTags, maxNumTags);
writerThreads.start(startKey, endKey, numWriterThreads);
}
if (isUpdate) {
@ -497,12 +487,12 @@ public class LoadTestTool extends AbstractHBaseTool {
System.out.println("Starting to mutate data...");
// TODO : currently append and increment operations not tested with tags
// Will update this aftet it is done
updaterThreads.start(startKey, endKey, numUpdaterThreads, true, minNumTags, maxNumTags);
updaterThreads.start(startKey, endKey, numUpdaterThreads);
}
if (isRead) {
System.out.println("Starting to read data...");
readerThreads.start(startKey, endKey, numReaderThreads, useTags, 0, 0);
readerThreads.start(startKey, endKey, numReaderThreads);
}
if (isWrite) {
@ -531,6 +521,18 @@ public class LoadTestTool extends AbstractHBaseTool {
return success ? EXIT_SUCCESS : EXIT_FAILURE;
}
private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
try {
Class<?> clazz = Class.forName(clazzName);
Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
byte[][].class);
return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
minColsPerKey, maxColsPerKey, COLUMN_FAMILIES);
} catch (Exception e) {
throw new IOException(e);
}
}
static byte[] generateData(final Random r, int length) {
byte [] b = new byte [length];
int i = 0;

View File

@ -68,10 +68,6 @@ public abstract class MultiThreadedAction {
protected AtomicLong totalOpTimeMs = new AtomicLong();
protected boolean verbose = false;
protected boolean useTags = false;
protected int minNumTags = 1;
protected int maxNumTags = 1;
protected LoadTestDataGenerator dataGenerator = null;
/**
@ -153,14 +149,10 @@ public abstract class MultiThreadedAction {
this.actionLetter = actionLetter;
}
public void start(long startKey, long endKey, int numThreads, boolean useTags, int minNumTags,
int maxNumTags) throws IOException {
public void start(long startKey, long endKey, int numThreads) throws IOException {
this.startKey = startKey;
this.endKey = endKey;
this.numThreads = numThreads;
this.useTags = useTags;
this.minNumTags = minNumTags;
this.maxNumTags = maxNumTags;
(new Thread(new ProgressReporter(actionLetter))).start();
}

View File

@ -91,9 +91,8 @@ public class MultiThreadedReader extends MultiThreadedAction
}
@Override
public void start(long startKey, long endKey, int numThreads, boolean useTags,
int minNumTags, int maxNumTags) throws IOException {
super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags);
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
}
@ -235,6 +234,7 @@ public class MultiThreadedReader extends MultiThreadedAction
}
try {
get = dataGenerator.beforeGet(keyToRead, get);
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}

View File

@ -79,9 +79,8 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
}
@Override
public void start(long startKey, long endKey, int numThreads, boolean useTags, int minNumTags,
int maxNumTags) throws IOException {
super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags);
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
@ -156,6 +155,7 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
try {
Get get = new Get(rowKey);
get.addFamily(cf);
get = dataGenerator.beforeGet(rowKeyBase, get);
result = table.get(get);
} catch (IOException ie) {
LOG.warn("Failed to get the row for key = ["
@ -257,6 +257,7 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
m = dataGenerator.beforeMutate(keyBase, m);
if (m instanceof Increment) {
table.increment((Increment)m);
} else if (m instanceof Append) {

View File

@ -24,7 +24,6 @@ import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INF
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
@ -45,11 +43,6 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
private boolean isMultiPut = false;
private Random random = new Random();
// TODO: Make this configurable
private int minTagLength = 16;
private int maxTagLength = 512;
public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName) {
super(dataGen, conf, tableName, "W");
@ -61,9 +54,8 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
}
@Override
public void start(long startKey, long endKey, int numThreads, boolean useTags,
int minNumTags, int maxNumTags) throws IOException {
super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags);
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
if (verbose) {
LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
@ -96,26 +88,9 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
int columnCount = 0;
for (byte[] cf : columnFamilies) {
byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
int numTags;
if (minNumTags == maxNumTags) {
numTags = minNumTags;
} else {
numTags = minNumTags + random.nextInt(maxNumTags - minNumTags);
}
Tag[] tags = new Tag[numTags];
for (byte[] column : columns) {
byte[] value = dataGenerator.generateValue(rowKey, cf, column);
byte[] tag = LoadTestTool.generateData(random,
minTagLength + random.nextInt(maxTagLength - minTagLength));
if(useTags) {
for (int n = 0; n < numTags; n++) {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
put.add(cf, column, value, tags);
} else {
put.add(cf, column, value);
}
put.add(cf, column, value);
++columnCount;
if (!isMultiPut) {
insert(table, put, rowKeyBase);

View File

@ -83,10 +83,8 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
}
@Override
public void start(long startKey, long endKey, int numThreads, boolean useTags, int minNumTags,
int maxNumTags) throws IOException {
super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags);
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
nextKeyToWrite.set(startKey);
wroteUpToKey.set(startKey - 1);
@ -176,6 +174,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
public void insert(HTable table, Put put, long keyBase) {
long start = System.currentTimeMillis();
try {
put = (Put) dataGenerator.beforeMutate(keyBase, put);
table.put(put);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {

View File

@ -81,7 +81,7 @@ public class RestartMetaTest extends AbstractHBaseTool {
minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY);
MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
writer.setMultiPut(true);
writer.start(startKey, endKey, numThreads, false, 0, 0);
writer.start(startKey, endKey, numThreads);
System.out.printf("Started loading data...");
writer.waitForFinish();
System.out.printf("Finished loading data...");

View File

@ -45,8 +45,8 @@ public class TestMiniClusterLoadParallel
readerThreads.linkToWriter(writerThreads);
writerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0);
readerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0);
writerThreads.start(0, numKeys, NUM_THREADS);
readerThreads.start(0, numKeys, NUM_THREADS);
writerThreads.waitForFinish();
readerThreads.waitForFinish();

View File

@ -129,11 +129,11 @@ public class TestMiniClusterLoadSequential {
}
protected void runLoadTestOnExistingTable() throws IOException {
writerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0);
writerThreads.start(0, numKeys, NUM_THREADS);
writerThreads.waitForFinish();
assertEquals(0, writerThreads.getNumWriteFailures());
readerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0);
readerThreads.start(0, numKeys, NUM_THREADS);
readerThreads.waitForFinish();
assertEquals(0, readerThreads.getNumReadFailures());
assertEquals(0, readerThreads.getNumReadErrors());

View File

@ -16,9 +16,12 @@
*/
package org.apache.hadoop.hbase.util.test;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
/**
* A generator of random data (keys/cfs/columns/values) for load testing.
@ -26,7 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public abstract class LoadTestDataGenerator {
protected final LoadTestKVGenerator kvGenerator;
protected LoadTestKVGenerator kvGenerator;
// The mutate info column stores information
// about update done to this column family this row.
@ -36,6 +39,10 @@ public abstract class LoadTestDataGenerator {
// which can be incremented later on during updates.
public final static byte[] INCREMENT = "increment".getBytes();
public LoadTestDataGenerator() {
}
/**
* Initializes the object.
* @param minValueSize minimum size of the value generated by
@ -47,6 +54,14 @@ public abstract class LoadTestDataGenerator {
this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize);
}
/**
* initialize the LoadTestDataGenerator
* @param args init args
*/
public void initialize(String[] args) {
}
/**
* Generates a deterministic, unique hashed row key from a number. That way, the user can
* keep track of numbers, without messing with byte array and ensuring key distribution.
@ -97,4 +112,26 @@ public abstract class LoadTestDataGenerator {
* @return True iff valid.
*/
public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value);
/**
* Giving a chance for the LoadTestDataGenerator to change the Mutation load.
* @param rowkeyBase
* @param m
* @return updated Mutation
* @throws IOException
*/
public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException {
return m;
}
/**
* Giving a chance for the LoadTestDataGenerator to change the Get load.
* @param rowkeyBase
* @param get
* @return updated Get
* @throws IOException
*/
public Get beforeGet(long rowkeyBase, Get get) throws IOException {
return get;
}
}