HBASE-9846-Integration test and LoadTestTool support for cell ACLs(Ram)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1556866 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2014-01-09 16:45:26 +00:00
parent 5a1f9d2e1e
commit 3cc835b7e1
17 changed files with 1227 additions and 143 deletions

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.ZeroCopyLiteralByteString;
/**
* Utility client for doing access control admin operations.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AccessControlClient {
/**
* Grants permission on the specified table for the specified user
* @param conf
* @param tableName
* @param userName
* @param family
* @param qual
* @param actions
* @return GrantResponse
* @throws Throwable
*/
public static GrantResponse grant(Configuration conf, final TableName tableName,
final String userName, final byte[] family, final byte[] qual,
final AccessControlProtos.Permission.Action... actions) throws Throwable {
HTable ht = null;
try {
TableName aclTableName =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
ht = new HTable(conf, aclTableName.getName());
Batch.Call<AccessControlService, GrantResponse> callable =
new Batch.Call<AccessControlService, GrantResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GrantResponse> rpcCallback =
new BlockingRpcCallback<GrantResponse>();
@Override
public GrantResponse call(AccessControlService service) throws IOException {
GrantRequest.Builder builder = GrantRequest.newBuilder();
AccessControlProtos.Permission.Builder ret =
AccessControlProtos.Permission.newBuilder();
AccessControlProtos.TablePermission.Builder permissionBuilder =
AccessControlProtos.TablePermission
.newBuilder();
for (AccessControlProtos.Permission.Action a : actions) {
permissionBuilder.addAction(a);
}
permissionBuilder.setTableName(ProtobufUtil.toProtoTableName(tableName));
if (family != null) {
permissionBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family));
}
if (qual != null) {
permissionBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(qual));
}
ret.setType(AccessControlProtos.Permission.Type.Table).setTablePermission(
permissionBuilder);
builder.setUserPermission(AccessControlProtos.UserPermission.newBuilder()
.setUser(ByteString.copyFromUtf8(userName)).setPermission(ret));
service.grant(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], GrantResponse> result = ht.coprocessorService(AccessControlService.class,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable);
return result.values().iterator().next(); // There will be exactly one
// region for labels
// table and so one entry in
// result Map.
} finally {
if (ht != null) {
ht.close();
}
}
}
/**
* Revokes the permission on the table
* @param conf
* @param username
* @param tableName
* @param family
* @param qualifier
* @param actions
* @return RevokeResponse
* @throws Throwable
*/
public static RevokeResponse revoke(Configuration conf, final String username,
final TableName tableName, final byte[] family, final byte[] qualifier,
final AccessControlProtos.Permission.Action... actions) throws Throwable {
HTable ht = null;
try {
TableName aclTableName = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR,
"acl");
ht = new HTable(conf, aclTableName.getName());
Batch.Call<AccessControlService, AccessControlProtos.RevokeResponse> callable =
new Batch.Call<AccessControlService, AccessControlProtos.RevokeResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AccessControlProtos.RevokeResponse> rpcCallback =
new BlockingRpcCallback<AccessControlProtos.RevokeResponse>();
@Override
public RevokeResponse call(AccessControlService service) throws IOException {
AccessControlProtos.Permission.Builder ret =
AccessControlProtos.Permission.newBuilder();
AccessControlProtos.TablePermission.Builder permissionBuilder =
AccessControlProtos.TablePermission.newBuilder();
for (AccessControlProtos.Permission.Action a : actions) {
permissionBuilder.addAction(a);
}
if (tableName != null) {
permissionBuilder.setTableName(ProtobufUtil.toProtoTableName(tableName));
}
if (family != null) {
permissionBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family));
}
if (qualifier != null) {
permissionBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(qualifier));
}
ret.setType(AccessControlProtos.Permission.Type.Table).setTablePermission(
permissionBuilder);
RevokeRequest builder = AccessControlProtos.RevokeRequest
.newBuilder()
.setUserPermission(
AccessControlProtos.UserPermission.newBuilder()
.setUser(ByteString.copyFromUtf8(username)).setPermission(ret)).build();
service.revoke(controller, builder, rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], RevokeResponse> result = ht.coprocessorService(AccessControlService.class,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable);
return result.values().iterator().next();
} finally {
if (ht != null) {
ht.close();
}
}
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.security.access.AccessController;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
import org.apache.hadoop.util.ToolRunner;
import org.junit.experimental.categories.Category;
/**
* /**
* An Integration class for tests that does something with the cluster while running
* {@link LoadTestTool} to write and verify some data.
* Verifies whether cells for users with only WRITE permissions are not read back
* and cells with READ permissions are read back.
* Every operation happens in the user's specific context
*/
@Category(IntegrationTests.class)
public class IntegrationTestIngestWithACL extends IntegrationTestIngest {
private static final char COLON = ':';
public static final char HYPHEN = '-';
private static final char COMMA = ',';
private static final int SPECIAL_PERM_CELL_INSERTION_FACTOR = 100;
public static final String[] userNames = { "user1", "user2", "user3", "user4" };
@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(null);
Configuration conf = util.getConfiguration();
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
conf.set("hbase.coprocessor.master.classes", AccessController.class.getName());
conf.set("hbase.coprocessor.region.classes", AccessController.class.getName());
// conf.set("hbase.superuser", "admin");
super.setUpCluster();
}
@Override
protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
long numKeys) {
String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys);
List<String> tmp = new ArrayList<String>(Arrays.asList(args));
tmp.add(HYPHEN + LoadTestTool.OPT_GENERATOR);
StringBuilder sb = new StringBuilder(LoadTestDataGeneratorWithACL.class.getName());
sb.append(COLON);
sb.append(asCommaSeperatedString(userNames));
sb.append(COLON);
sb.append(Integer.toString(SPECIAL_PERM_CELL_INSERTION_FACTOR));
tmp.add(sb.toString());
return tmp.toArray(new String[tmp.size()]);
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int ret = ToolRunner.run(conf, new IntegrationTestIngestWithACL(), args);
System.exit(ret);
}
private static String asCommaSeperatedString(String[] list) {
StringBuilder sb = new StringBuilder();
for (String item : list) {
sb.append(item);
sb.append(COMMA);
}
if (sb.length() > 0) {
// Remove the trailing ,
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
}

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.security.access;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -34,11 +37,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
/**
* Performs authorization checks for a given user's assigned permissions

View File

@ -38,6 +38,7 @@ public class LoadTestDataGeneratorWithVisibilityLabels extends DefaultDataGenera
@Override
public void initialize(String[] args) {
super.initialize(args);
if (args.length < 1 || args.length > 2) {
throw new IllegalArgumentException("LoadTestDataGeneratorWithVisibilityLabels can have "
+ "1 or 2 initialization arguments");

View File

@ -45,6 +45,7 @@ public class LoadTestDataGeneratorWithTags extends DefaultDataGenerator {
@Override
public void initialize(String[] args) {
super.initialize(args);
if (args.length != 4) {
throw new IllegalArgumentException("LoadTestDataGeneratorWithTags must have "
+ "4 initialization arguments. ie. minNumTags:maxNumTags:minTagLength:maxTagLength");

View File

@ -39,14 +39,18 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessControlClient;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
import org.apache.hadoop.util.ToolRunner;
/**
@ -152,6 +156,8 @@ public class LoadTestTool extends AbstractHBaseTool {
protected Compression.Algorithm compressAlgo;
protected BloomType bloomType;
private boolean inMemoryCF;
private User userOwner;
// Writer options
protected int numWriterThreads = DEFAULT_NUM_THREADS;
protected int minColsPerKey, maxColsPerKey;
@ -435,11 +441,14 @@ public class LoadTestTool extends AbstractHBaseTool {
if (!isSkipInit) {
initTestTable();
}
LoadTestDataGenerator dataGen = null;
if (cmd.hasOption(OPT_GENERATOR)) {
String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
if(dataGen instanceof LoadTestDataGeneratorWithACL) {
LOG.info("ACL is on");
userOwner = User.createUserForTesting(conf, "owner", new String[0]);
}
String[] args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs,
1, clazzAndArgs.length);
dataGen.initialize(args);
@ -449,18 +458,50 @@ public class LoadTestTool extends AbstractHBaseTool {
minColsPerKey, maxColsPerKey, COLUMN_FAMILY);
}
if(userOwner != null) {
conf.set("hadoop.security.authorization", "false");
conf.set("hadoop.security.authentication", "simple");
LOG.info("Granting permission for the user " + userOwner.getShortName());
HTable table = new HTable(conf, tableName);
AccessControlProtos.Permission.Action[] actions = {
AccessControlProtos.Permission.Action.ADMIN,
AccessControlProtos.Permission.Action.CREATE, AccessControlProtos.Permission.Action.READ,
AccessControlProtos.Permission.Action.WRITE };
try {
AccessControlClient.grant(conf, table.getName(), userOwner.getShortName(), COLUMN_FAMILY,
null, actions);
} catch (Throwable e) {
LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
return EXIT_FAILURE;
}
}
if (isWrite) {
writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
if (userOwner != null) {
writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
} else {
writerThreads = new MultiThreadedWriter(dataGen, conf, tableName);
}
writerThreads.setMultiPut(isMultiPut);
}
if (isUpdate) {
updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
if (userOwner != null) {
updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
userOwner);
} else {
updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent);
}
updaterThreads.setBatchUpdate(isBatchUpdate);
}
if (isRead) {
readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
if (userOwner != null) {
readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent);
} else {
readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
}
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}
@ -533,7 +574,7 @@ public class LoadTestTool extends AbstractHBaseTool {
}
}
static byte[] generateData(final Random r, int length) {
public static byte[] generateData(final Random r, int length) {
byte [] b = new byte [length];
int i = 0;

View File

@ -37,11 +37,11 @@ public class MultiThreadedReader extends MultiThreadedAction
{
private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
private Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
private final double verifyPercent;
private volatile boolean aborted;
private MultiThreadedWriterBase writer = null;
protected MultiThreadedWriterBase writer = null;
/**
* The number of keys verified in a sequence. This will never be larger than
@ -65,8 +65,9 @@ public class MultiThreadedReader extends MultiThreadedAction
public static final int DEFAULT_KEY_WINDOW = 0;
protected AtomicLong numKeysVerified = new AtomicLong(0);
private AtomicLong numReadErrors = new AtomicLong(0);
private AtomicLong numReadFailures = new AtomicLong(0);
protected AtomicLong numReadErrors = new AtomicLong(0);
protected AtomicLong numReadFailures = new AtomicLong(0);
protected AtomicLong nullResult = new AtomicLong(0);
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
@ -97,22 +98,26 @@ public class MultiThreadedReader extends MultiThreadedAction
LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
}
addReaderThreads(numThreads);
startThreads(readers);
}
protected void addReaderThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseReaderThread reader = new HBaseReaderThread(i);
readers.add(reader);
}
startThreads(readers);
}
public class HBaseReaderThread extends Thread {
private final int readerId;
private final HTable table;
protected final int readerId;
protected final HTable table;
/** The "current" key being read. Increases from startKey to endKey. */
private long curKey;
/** Time when the thread started */
private long startTimeMs;
protected long startTimeMs;
/** If we are ahead of the writer and reading a random key. */
private boolean readingRandomKey;
@ -123,24 +128,34 @@ public class MultiThreadedReader extends MultiThreadedAction
*/
public HBaseReaderThread(int readerId) throws IOException {
this.readerId = readerId;
table = new HTable(conf, tableName);
table = createTable();
setName(getClass().getSimpleName() + "_" + readerId);
}
protected HTable createTable() throws IOException {
return new HTable(conf, tableName);
}
@Override
public void run() {
try {
runReader();
} finally {
try {
table.close();
} catch (IOException e) {
LOG.error("Error closing table", e);
}
closeTable();
numThreadsWorking.decrementAndGet();
}
}
protected void closeTable() {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
LOG.error("Error closing table", e);
}
}
private void runReader() {
if (verbose) {
LOG.info("Started thread #" + readerId + " for reads...");
@ -238,7 +253,7 @@ public class MultiThreadedReader extends MultiThreadedAction
if (verbose) {
LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
}
queryKey(get, RandomUtils.nextInt(100) < verifyPercent);
queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead);
} catch (IOException e) {
numReadFailures.addAndGet(1);
LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
@ -248,12 +263,18 @@ public class MultiThreadedReader extends MultiThreadedAction
return get;
}
public void queryKey(Get get, boolean verify) throws IOException {
public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
String rowKey = Bytes.toString(get.getRow());
// read the data
long start = System.currentTimeMillis();
Result result = table.get(get);
getResultMetricUpdation(verify, rowKey, start, result, table, false);
}
protected void getResultMetricUpdation(boolean verify, String rowKey, long start,
Result result, HTable table, boolean isNullExpected)
throws IOException {
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
numKeys.addAndGet(1);
if (!result.isEmpty()) {
@ -265,9 +286,13 @@ public class MultiThreadedReader extends MultiThreadedAction
Bytes.toBytes(rowKey));
LOG.info("Key = " + rowKey + ", RegionServer: "
+ hloc.getHostname());
if(isNullExpected) {
nullResult.incrementAndGet();
LOG.debug("Null result obtained for the key ="+rowKey);
return;
}
}
boolean isOk = verifyResultAgainstDataGenerator(result, verify);
boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
long numErrorsAfterThis = 0;
if (isOk) {
long cols = 0;
@ -306,12 +331,17 @@ public class MultiThreadedReader extends MultiThreadedAction
return numUniqueKeysVerified.get();
}
public long getNullResultsCount() {
return nullResult.get();
}
@Override
protected String progressInfo() {
StringBuilder sb = new StringBuilder();
appendToStatus(sb, "verified", numKeysVerified.get());
appendToStatus(sb, "READ FAILURES", numReadFailures.get());
appendToStatus(sb, "READ ERRORS", numReadErrors.get());
appendToStatus(sb, "NULL RESULT", nullResult.get());
return sb.toString();
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.security.UserGroupInformation;
/**
* A MultiThreadReader that helps to work with ACL
*/
public class MultiThreadedReaderWithACL extends MultiThreadedReader {
private static final Log LOG = LogFactory.getLog(MultiThreadedReaderWithACL.class);
private static final String COMMA = ",";
/**
* Maps user with Table instance. Because the table instance has to be created
* per user inorder to work in that user's context
*/
private Map<String, HTable> userVsTable = new HashMap<String, HTable>();
private Map<String, User> users = new HashMap<String, User>();
private String[] userNames;
public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double verifyPercent) {
super(dataGen, conf, tableName, verifyPercent);
userNames = dataGenerator.getArgs()[0].split(COMMA);
}
@Override
protected void addReaderThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseReaderThread reader = new HBaseReaderThreadWithACL(i);
readers.add(reader);
}
}
public class HBaseReaderThreadWithACL extends HBaseReaderThread {
public HBaseReaderThreadWithACL(int readerId) throws IOException {
super(readerId);
}
@Override
protected HTable createTable() throws IOException {
return null;
}
@Override
protected void closeTable() {
for (HTable table : userVsTable.values()) {
try {
table.close();
} catch (Exception e) {
LOG.error("Error while closing the table " + table.getName(), e);
}
}
}
@Override
public void queryKey(final Get get, final boolean verify, final long keyToRead)
throws IOException {
final String rowKey = Bytes.toString(get.getRow());
// read the data
final long start = System.currentTimeMillis();
PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
HTable localTable = null;
try {
get.setACLStrategy(true);
Result result = null;
int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[1]);
int mod = ((int) keyToRead % userNames.length);
if (userVsTable.get(userNames[mod]) == null) {
localTable = new HTable(conf, tableName);
userVsTable.put(userNames[mod], localTable);
result = localTable.get(get);
} else {
localTable = userVsTable.get(userNames[mod]);
result = localTable.get(get);
}
boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
LOG.info("Read happening from ACL " + isNullExpected);
getResultMetricUpdation(verify, rowKey, start, result, localTable, isNullExpected);
} catch (IOException e) {
recordFailure(keyToRead);
}
return null;
}
};
if (userNames != null && userNames.length > 0) {
int mod = ((int) keyToRead % userNames.length);
User user;
if(!users.containsKey(userNames[mod])) {
UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
user = User.create(realUserUgi);
users.put(userNames[mod], user);
} else {
user = users.get(userNames[mod]);
}
try {
user.runAs(action);
} catch (Exception e) {
recordFailure(keyToRead);
}
}
}
private void recordFailure(final long keyToRead) {
numReadFailures.addAndGet(1);
LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", "
+ "time from start: " + (System.currentTimeMillis() - startTimeMs) + " ms");
}
}
}

View File

@ -56,7 +56,7 @@ import com.google.common.base.Preconditions;
public class MultiThreadedUpdater extends MultiThreadedWriterBase {
private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class);
private Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
protected Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
private MultiThreadedWriterBase writer = null;
private boolean isBatchUpdate = false;
@ -86,12 +86,16 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
}
addUpdaterThreads(numThreads);
startThreads(updaters);
}
protected void addUpdaterThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
updaters.add(updater);
}
startThreads(updaters);
}
private long getNextKeyToUpdate() {
@ -115,12 +119,16 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
}
}
private class HBaseUpdaterThread extends Thread {
private final HTable table;
protected class HBaseUpdaterThread extends Thread {
protected final HTable table;
public HBaseUpdaterThread(int updaterId) throws IOException {
setName(getClass().getSimpleName() + "_" + updaterId);
table = new HTable(conf, tableName);
table = createTable();
}
protected HTable createTable() throws IOException {
return new HTable(conf, tableName);
}
public void run() {
@ -151,67 +159,73 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
numCols.addAndGet(1);
app = new Append(rowKey);
}
Result result = null;
Get get = new Get(rowKey);
get.addFamily(cf);
try {
Get get = new Get(rowKey);
get.addFamily(cf);
get = dataGenerator.beforeGet(rowKeyBase, get);
result = table.get(get);
} catch (IOException ie) {
LOG.warn("Failed to get the row for key = ["
+ rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie);
} catch (Exception e) {
// Ideally wont happen
LOG.warn("Failed to modify the get from the load generator = [" + get.getRow()
+ "], column family = [" + Bytes.toString(cf) + "]", e);
}
Result result = getRow(get, rowKeyBase, cf);
Map<byte[], byte[]> columnValues =
result != null ? result.getFamilyMap(cf) : null;
if (columnValues == null) {
failedKeySet.add(rowKeyBase);
LOG.error("Failed to update the row with key = ["
+ rowKey + "], since we could not get the original row");
int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[1]);
if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
} else {
failedKeySet.add(rowKeyBase);
LOG.error("Failed to update the row with key = [" + rowKey
+ "], since we could not get the original row");
}
}
for (byte[] column : columnValues.keySet()) {
if (Bytes.equals(column, INCREMENT)
|| Bytes.equals(column, MUTATE_INFO)) {
continue;
}
MutationType mt = MutationType.valueOf(
RandomUtils.nextInt(MutationType.values().length));
long columnHash = Arrays.hashCode(column);
long hashCode = cfHash + columnHash;
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
if (hashCode % 2 == 0) {
Cell kv = result.getColumnLatestCell(cf, column);
checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
Preconditions.checkNotNull(checkedValue,
"Column value to be checked should not be null");
}
buf.setLength(0); // Clear the buffer
buf.append("#").append(Bytes.toString(column)).append(":");
++columnCount;
switch (mt) {
case PUT:
Put put = new Put(rowKey);
put.add(cf, column, hashCodeBytes);
mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.PUT.getNumber());
break;
case DELETE:
Delete delete = new Delete(rowKey);
// Delete all versions since a put
// could be called multiple times if CM is used
delete.deleteColumns(cf, column);
mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.DELETE.getNumber());
break;
default:
buf.append(MutationType.APPEND.getNumber());
app.add(cf, column, hashCodeBytes);
}
app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
if (!isBatchUpdate) {
mutate(table, app, rowKeyBase);
numCols.addAndGet(1);
app = new Append(rowKey);
if(columnValues != null) {
for (byte[] column : columnValues.keySet()) {
if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
continue;
}
MutationType mt = MutationType
.valueOf(RandomUtils.nextInt(MutationType.values().length));
long columnHash = Arrays.hashCode(column);
long hashCode = cfHash + columnHash;
byte[] hashCodeBytes = Bytes.toBytes(hashCode);
byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
if (hashCode % 2 == 0) {
Cell kv = result.getColumnLatestCell(cf, column);
checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
Preconditions.checkNotNull(checkedValue,
"Column value to be checked should not be null");
}
buf.setLength(0); // Clear the buffer
buf.append("#").append(Bytes.toString(column)).append(":");
++columnCount;
switch (mt) {
case PUT:
Put put = new Put(rowKey);
put.add(cf, column, hashCodeBytes);
mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.PUT.getNumber());
break;
case DELETE:
Delete delete = new Delete(rowKey);
// Delete all versions since a put
// could be called multiple times if CM is used
delete.deleteColumns(cf, column);
mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
buf.append(MutationType.DELETE.getNumber());
break;
default:
buf.append(MutationType.APPEND.getNumber());
app.add(cf, column, hashCodeBytes);
}
app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
if (!isBatchUpdate) {
mutate(table, app, rowKeyBase);
numCols.addAndGet(1);
app = new Append(rowKey);
}
}
}
}
@ -230,14 +244,74 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
}
}
} finally {
try {
table.close();
} catch (IOException e) {
LOG.error("Error closing table", e);
}
closeHTable();
numThreadsWorking.decrementAndGet();
}
}
protected void closeHTable() {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
LOG.error("Error closing table", e);
}
}
protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
Result result = null;
try {
result = table.get(get);
} catch (IOException ie) {
LOG.warn(
"Failed to get the row for key = [" + get.getRow() + "], column family = ["
+ Bytes.toString(cf) + "]", ie);
}
return result;
}
public void mutate(HTable table, Mutation m, long keyBase) {
mutate(table, m, keyBase, null, null, null, null);
}
public void mutate(HTable table, Mutation m,
long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
long start = System.currentTimeMillis();
try {
m = dataGenerator.beforeMutate(keyBase, m);
if (m instanceof Increment) {
table.increment((Increment)m);
} else if (m instanceof Append) {
table.append((Append)m);
} else if (m instanceof Put) {
table.checkAndPut(row, cf, q, v, (Put)m);
} else if (m instanceof Delete) {
table.checkAndDelete(row, cf, q, v, (Delete)m);
} else {
throw new IllegalArgumentException(
"unsupported mutation " + m.getClass().getSimpleName());
}
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " +
(System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
}
}
@Override

View File

@ -0,0 +1,264 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
/**
* A MultiThreadUpdater that helps to work with ACL
*/
public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
private static final Log LOG = LogFactory.getLog(MultiThreadedUpdaterWithACL.class);
private final static String COMMA= ",";
private User userOwner;
/**
* Maps user with Table instance. Because the table instance has to be created
* per user inorder to work in that user's context
*/
private Map<String, HTable> userVsTable = new HashMap<String, HTable>();
private Map<String, User> users = new HashMap<String, User>();
private String[] userNames;
public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double updatePercent, User userOwner) {
super(dataGen, conf, tableName, updatePercent);
this.userOwner = userOwner;
userNames = dataGenerator.getArgs()[0].split(COMMA);
}
@Override
protected void addUpdaterThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i);
updaters.add(updater);
}
}
public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread {
private HTable table;
private MutateAccessAction mutateAction = new MutateAccessAction();
public HBaseUpdaterThreadWithACL(int updaterId) throws IOException {
super(updaterId);
}
@Override
protected HTable createTable() throws IOException {
return null;
}
@Override
protected void closeHTable() {
try {
if (table != null) {
table.close();
}
for (HTable table : userVsTable.values()) {
try {
table.close();
} catch (Exception e) {
LOG.error("Error while closing the table " + table.getName(), e);
}
}
} catch (Exception e) {
LOG.error("Error while closing the HTable "+table.getName(), e);
}
}
@Override
protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) {
PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
Result res = null;
HTable localTable = null;
try {
int mod = ((int) rowKeyBase % userNames.length);
if (userVsTable.get(userNames[mod]) == null) {
localTable = new HTable(conf, tableName);
userVsTable.put(userNames[mod], localTable);
res = localTable.get(get);
} else {
localTable = userVsTable.get(userNames[mod]);
res = localTable.get(get);
}
} catch (IOException ie) {
LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = ["
+ Bytes.toString(cf) + "]", ie);
}
return res;
}
};
if (userNames != null && userNames.length > 0) {
int mod = ((int) rowKeyBase % userNames.length);
User user;
if(!users.containsKey(userNames[mod])) {
UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
user = User.create(realUserUgi);
users.put(userNames[mod], user);
} else {
user = users.get(userNames[mod]);
}
try {
Result result = (Result) user.runAs(action);
return result;
} catch (Exception ie) {
LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = ["
+ Bytes.toString(cf) + "]", ie);
}
}
// This means that no users were present
return null;
}
@Override
public void mutate(final HTable table, Mutation m, final long keyBase, final byte[] row,
final byte[] cf, final byte[] q, final byte[] v) {
final long start = System.currentTimeMillis();
try {
m = dataGenerator.beforeMutate(keyBase, m);
mutateAction.setMutation(m);
mutateAction.setCF(cf);
mutateAction.setRow(row);
mutateAction.setQualifier(q);
mutateAction.setValue(v);
mutateAction.setStartTime(start);
mutateAction.setKeyBase(keyBase);
userOwner.runAs(mutateAction);
} catch (IOException e) {
recordFailure(m, keyBase, start, e);
} catch (InterruptedException e) {
failedKeySet.add(keyBase);
}
}
class MutateAccessAction implements PrivilegedExceptionAction<Object> {
private HTable table;
private long start;
private Mutation m;
private long keyBase;
private byte[] row;
private byte[] cf;
private byte[] q;
private byte[] v;
public MutateAccessAction() {
}
public void setStartTime(final long start) {
this.start = start;
}
public void setMutation(final Mutation m) {
this.m = m;
}
public void setRow(final byte[] row) {
this.row = row;
}
public void setCF(final byte[] cf) {
this.cf = cf;
}
public void setQualifier(final byte[] q) {
this.q = q;
}
public void setValue(final byte[] v) {
this.v = v;
}
public void setKeyBase(final long keyBase) {
this.keyBase = keyBase;
}
@Override
public Object run() throws Exception {
try {
if (table == null) {
table = new HTable(conf, tableName);
}
if (m instanceof Increment) {
table.increment((Increment) m);
} else if (m instanceof Append) {
table.append((Append) m);
} else if (m instanceof Put) {
table.checkAndPut(row, cf, q, v, (Put) m);
} else if (m instanceof Delete) {
table.checkAndDelete(row, cf, q, v, (Delete) m);
} else {
throw new IllegalArgumentException("unsupported mutation "
+ m.getClass().getSimpleName());
}
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
recordFailure(m, keyBase, start, e);
}
return null;
}
}
private void recordFailure(final Mutation m, final long keyBase,
final long start, IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start)
+ "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
}
}

View File

@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@ -33,15 +35,17 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedWriter extends MultiThreadedWriterBase {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
protected Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
private boolean isMultiPut = false;
protected boolean isMultiPut = false;
public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName) {
@ -61,20 +65,28 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
}
for (int i = 0; i < numThreads; ++i) {
HBaseWriterThread writer = new HBaseWriterThread(i);
writers.add(writer);
}
createWriterThreads(numThreads);
startThreads(writers);
}
private class HBaseWriterThread extends Thread {
protected void createWriterThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseWriterThread writer = new HBaseWriterThread(i);
writers.add(writer);
}
}
public class HBaseWriterThread extends Thread {
private final HTable table;
public HBaseWriterThread(int writerId) throws IOException {
setName(getClass().getSimpleName() + "_" + writerId);
table = new HTable(conf, tableName);
table = createTable();
}
protected HTable createTable() throws IOException {
return new HTable(conf, tableName);
}
public void run() {
@ -119,14 +131,44 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
}
}
} finally {
try {
table.close();
} catch (IOException e) {
LOG.error("Error closing table", e);
}
closeHTable();
numThreadsWorking.decrementAndGet();
}
}
public void insert(HTable table, Put put, long keyBase) {
long start = System.currentTimeMillis();
try {
put = (Put) dataGenerator.beforeMutate(keyBase, put);
table.put(put);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
+ "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow())
+ "; errors: " + exceptionInfo);
}
}
protected void closeHTable() {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
LOG.error("Error closing table", e);
}
}
}
@Override

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
@ -36,10 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
/** Creates multiple threads that write key/values into the */
public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
@ -171,31 +166,6 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
return failedKeySet.size();
}
public void insert(HTable table, Put put, long keyBase) {
long start = System.currentTimeMillis();
try {
put = (Put) dataGenerator.beforeMutate(keyBase, put);
table.put(put);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) +
"ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+ exceptionInfo);
}
}
/**
* The max key until which all keys have been inserted/updated (successfully or not).
* @return the last key that we have inserted/updated all keys up to (inclusive)

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
/**
* MultiThreadedWriter that helps in testing ACL
*/
public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
private static final Log LOG = LogFactory.getLog(MultiThreadedWriterWithACL.class);
private User userOwner;
public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, User userOwner) {
super(dataGen, conf, tableName);
this.userOwner = userOwner;
}
@Override
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
}
@Override
protected void createWriterThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseWriterThread writer = new HBaseWriterThreadWithACL(i);
writers.add(writer);
}
}
public class HBaseWriterThreadWithACL extends HBaseWriterThread {
private HTable table;
private WriteAccessAction writerAction = new WriteAccessAction();
public HBaseWriterThreadWithACL(int writerId) throws IOException {
super(writerId);
}
@Override
protected HTable createTable() throws IOException {
return null;
}
@Override
protected void closeHTable() {
if (table != null) {
try {
table.close();
} catch (Exception e) {
LOG.error("Error in closing the table "+table.getName(), e);
}
}
}
@Override
public void insert(final HTable table, Put put, final long keyBase) {
final long start = System.currentTimeMillis();
try {
put = (Put) dataGenerator.beforeMutate(keyBase, put);
writerAction.setPut(put);
writerAction.setKeyBase(keyBase);
writerAction.setStartTime(start);
userOwner.runAs(writerAction);
} catch (IOException e) {
recordFailure(table, put, keyBase, start, e);
} catch (InterruptedException e) {
failedKeySet.add(keyBase);
}
}
class WriteAccessAction implements PrivilegedExceptionAction<Object> {
private Put put;
private long keyBase;
private long start;
public WriteAccessAction() {
}
public void setPut(final Put put) {
this.put = put;
}
public void setKeyBase(final long keyBase) {
this.keyBase = keyBase;
}
public void setStartTime(final long start) {
this.start = start;
}
@Override
public Object run() throws Exception {
try {
if (table == null) {
table = new HTable(conf, tableName);
}
table.put(put);
} catch (IOException e) {
recordFailure(table, put, keyBase, start, e);
}
return null;
}
}
}
private void recordFailure(final HTable table, final Put put, final long keyBase,
final long start, IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
+ "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+ exceptionInfo);
}
}

View File

@ -21,9 +21,9 @@ import java.io.IOException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;

View File

@ -25,8 +25,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;

View File

@ -39,6 +39,8 @@ public abstract class LoadTestDataGenerator {
// which can be incremented later on during updates.
public final static byte[] INCREMENT = "increment".getBytes();
protected String[] args;
public LoadTestDataGenerator() {
}
@ -56,10 +58,12 @@ public abstract class LoadTestDataGenerator {
/**
* initialize the LoadTestDataGenerator
* @param args init args
*
* @param args
* init args
*/
public void initialize(String[] args) {
this.args = args;
}
/**
@ -134,4 +138,12 @@ public abstract class LoadTestDataGenerator {
public Get beforeGet(long rowkeyBase, Get get) throws IOException {
return get;
}
/**
* Return the arguments passed to the generator as list of object
* @return
*/
public String[] getArgs() {
return this.args;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util.test;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.util.MultiThreadedAction.DefaultDataGenerator;
@InterfaceAudience.Private
public class LoadTestDataGeneratorWithACL extends DefaultDataGenerator {
private static final Log LOG = LogFactory.getLog(LoadTestDataGeneratorWithACL.class);
private String[] userNames = null;
private static final String COMMA = ",";
private int specialPermCellInsertionFactor = 100;
public LoadTestDataGeneratorWithACL(int minValueSize, int maxValueSize, int minColumnsPerKey,
int maxColumnsPerKey, byte[]... columnFamilies) {
super(minValueSize, maxValueSize, minColumnsPerKey, maxColumnsPerKey, columnFamilies);
}
@Override
public void initialize(String[] args) {
super.initialize(args);
if (args.length != 2) {
throw new IllegalArgumentException(
"LoadTestDataGeneratorWithACL can have "
+ "1st arguement which would be the user list and the 2nd argument "
+ "should be the factor representing "
+ "the row keys for which only write ACLs will be added.");
}
String temp = args[0];
// This will be comma separated list of expressions.
this.userNames = temp.split(COMMA);
this.specialPermCellInsertionFactor = Integer.parseInt(args[1]);
}
@Override
public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException {
if (!(m instanceof Delete)) {
if (userNames != null && userNames.length > 0) {
int mod = ((int) rowkeyBase % this.userNames.length);
if (((int) rowkeyBase % specialPermCellInsertionFactor) == 0) {
// These cells cannot be read back when running as user userName[mod]
if (LOG.isTraceEnabled()) {
LOG.trace("Adding special perm " + rowkeyBase);
}
m.setACL(userNames[mod], new Permission(Permission.Action.WRITE));
} else {
m.setACL(userNames[mod], new Permission(Permission.Action.READ));
}
}
}
return m;
}
}