HBASE-11509 Forward port HBASE-11039 to trunk and branch-1 after
HBASE-11489 (Ram)
This commit is contained in:
parent
93e7ec42d1
commit
f0035e6837
|
@ -23,14 +23,15 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
|
@ -40,11 +41,12 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface;
|
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
|
@ -119,6 +121,21 @@ public class AccessControlClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isAccessControllerRunning(Configuration conf)
|
||||||
|
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
|
||||||
|
TableName aclTableName = TableName
|
||||||
|
.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
|
||||||
|
HBaseAdmin ha = null;
|
||||||
|
try {
|
||||||
|
ha = new HBaseAdmin(conf);
|
||||||
|
return ha.isTableAvailable(aclTableName.getNameAsString());
|
||||||
|
} finally {
|
||||||
|
if (ha != null) {
|
||||||
|
ha.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Revokes the permission on the table
|
* Revokes the permission on the table
|
||||||
* @param conf
|
* @param conf
|
||||||
|
|
|
@ -41,4 +41,9 @@ public class CellVisibility {
|
||||||
public String getExpression() {
|
public String getExpression() {
|
||||||
return this.expression;
|
return this.expression;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return this.expression;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,22 +160,22 @@ import com.google.common.collect.Sets;
|
||||||
*/
|
*/
|
||||||
@Category(IntegrationTests.class)
|
@Category(IntegrationTests.class)
|
||||||
public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
private static final byte[] NO_KEY = new byte[1];
|
protected static final byte[] NO_KEY = new byte[1];
|
||||||
|
|
||||||
protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
|
protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
|
||||||
|
|
||||||
protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
|
protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
|
||||||
|
|
||||||
private static byte[] FAMILY_NAME = Bytes.toBytes("meta");
|
protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
|
||||||
|
|
||||||
//link to the id of the prev node in the linked list
|
//link to the id of the prev node in the linked list
|
||||||
private static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
|
protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
|
||||||
|
|
||||||
//identifier of the mapred task that generated this row
|
//identifier of the mapred task that generated this row
|
||||||
private static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
|
protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
|
||||||
|
|
||||||
//the id of the row within the same client.
|
//the id of the row within the same client.
|
||||||
private static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
|
protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
|
||||||
|
|
||||||
/** How many rows to write per map task. This has to be a multiple of 25M */
|
/** How many rows to write per map task. This has to be a multiple of 25M */
|
||||||
private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
|
private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
|
||||||
|
@ -198,8 +198,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
private static final int WRAP_DEFAULT = 25;
|
private static final int WRAP_DEFAULT = 25;
|
||||||
private static final int ROWKEY_LENGTH = 16;
|
private static final int ROWKEY_LENGTH = 16;
|
||||||
|
|
||||||
private String toRun;
|
protected String toRun;
|
||||||
private String[] otherArgs;
|
protected String[] otherArgs;
|
||||||
|
|
||||||
static class CINode {
|
static class CINode {
|
||||||
byte[] key;
|
byte[] key;
|
||||||
|
@ -345,9 +345,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
protected void setup(Context context) throws IOException, InterruptedException {
|
protected void setup(Context context) throws IOException, InterruptedException {
|
||||||
id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
|
id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
|
||||||
Configuration conf = context.getConfiguration();
|
Configuration conf = context.getConfiguration();
|
||||||
table = new HTable(conf, getTableName(conf));
|
instantiateHTable(conf);
|
||||||
table.setAutoFlush(false, true);
|
|
||||||
table.setWriteBufferSize(4 * 1024 * 1024);
|
|
||||||
this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
|
this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
|
||||||
current = new byte[this.width][];
|
current = new byte[this.width][];
|
||||||
int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
|
int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
|
||||||
|
@ -359,6 +357,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void instantiateHTable(Configuration conf) throws IOException {
|
||||||
|
table = new HTable(conf, getTableName(conf));
|
||||||
|
table.setAutoFlush(false, true);
|
||||||
|
table.setWriteBufferSize(4 * 1024 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void cleanup(Context context) throws IOException ,InterruptedException {
|
protected void cleanup(Context context) throws IOException ,InterruptedException {
|
||||||
table.close();
|
table.close();
|
||||||
|
@ -400,7 +404,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
first[first.length - 1] = ez;
|
first[first.length - 1] = ez;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
|
protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
for (int i = 0; i < current.length; i++) {
|
for (int i = 0; i < current.length; i++) {
|
||||||
Put put = new Put(current[i]);
|
Put put = new Put(current[i]);
|
||||||
|
@ -495,7 +499,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
FileOutputFormat.setOutputPath(job, tmpOutput);
|
FileOutputFormat.setOutputPath(job, tmpOutput);
|
||||||
job.setOutputFormatClass(SequenceFileOutputFormat.class);
|
job.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||||
|
|
||||||
boolean success = job.waitForCompletion(true);
|
boolean success = jobCompletion(job);
|
||||||
|
|
||||||
return success ? 0 : 1;
|
return success ? 0 : 1;
|
||||||
}
|
}
|
||||||
|
@ -517,7 +521,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
|
|
||||||
setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
|
setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
|
||||||
|
|
||||||
job.setMapperClass(GeneratorMapper.class);
|
setMapperForGenerator(job);
|
||||||
|
|
||||||
job.setOutputFormatClass(NullOutputFormat.class);
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
|
||||||
|
@ -526,11 +530,21 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
|
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
|
||||||
TableMapReduceUtil.initCredentials(job);
|
TableMapReduceUtil.initCredentials(job);
|
||||||
|
|
||||||
boolean success = job.waitForCompletion(true);
|
boolean success = jobCompletion(job);
|
||||||
|
|
||||||
return success ? 0 : 1;
|
return success ? 0 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
boolean success = job.waitForCompletion(true);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setMapperForGenerator(Job job) {
|
||||||
|
job.setMapperClass(GeneratorMapper.class);
|
||||||
|
}
|
||||||
|
|
||||||
public int run(int numMappers, long numNodes, Path tmpOutput,
|
public int run(int numMappers, long numNodes, Path tmpOutput,
|
||||||
Integer width, Integer wrapMuplitplier) throws Exception {
|
Integer width, Integer wrapMuplitplier) throws Exception {
|
||||||
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
|
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
|
||||||
|
@ -548,9 +562,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
static class Verify extends Configured implements Tool {
|
static class Verify extends Configured implements Tool {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(Verify.class);
|
private static final Log LOG = LogFactory.getLog(Verify.class);
|
||||||
private static final BytesWritable DEF = new BytesWritable(NO_KEY);
|
protected static final BytesWritable DEF = new BytesWritable(NO_KEY);
|
||||||
|
|
||||||
private Job job;
|
protected Job job;
|
||||||
|
|
||||||
public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
|
public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
|
||||||
private BytesWritable row = new BytesWritable();
|
private BytesWritable row = new BytesWritable();
|
||||||
|
@ -727,28 +741,32 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
Configuration conf = job.getConfiguration();
|
handleFailure(counters);
|
||||||
HConnection conn = HConnectionManager.getConnection(conf);
|
|
||||||
TableName tableName = getTableName(conf);
|
|
||||||
CounterGroup g = counters.getGroup("undef");
|
|
||||||
Iterator<Counter> it = g.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
String keyString = it.next().getName();
|
|
||||||
byte[] key = Bytes.toBytes(keyString);
|
|
||||||
HRegionLocation loc = conn.relocateRegion(tableName, key);
|
|
||||||
LOG.error("undefined row " + keyString + ", " + loc);
|
|
||||||
}
|
|
||||||
g = counters.getGroup("unref");
|
|
||||||
it = g.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
String keyString = it.next().getName();
|
|
||||||
byte[] key = Bytes.toBytes(keyString);
|
|
||||||
HRegionLocation loc = conn.relocateRegion(tableName, key);
|
|
||||||
LOG.error("unreferred row " + keyString + ", " + loc);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void handleFailure(Counters counters) throws IOException {
|
||||||
|
Configuration conf = job.getConfiguration();
|
||||||
|
HConnection conn = HConnectionManager.getConnection(conf);
|
||||||
|
TableName tableName = getTableName(conf);
|
||||||
|
CounterGroup g = counters.getGroup("undef");
|
||||||
|
Iterator<Counter> it = g.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
String keyString = it.next().getName();
|
||||||
|
byte[] key = Bytes.toBytes(keyString);
|
||||||
|
HRegionLocation loc = conn.relocateRegion(tableName, key);
|
||||||
|
LOG.error("undefined row " + keyString + ", " + loc);
|
||||||
|
}
|
||||||
|
g = counters.getGroup("unref");
|
||||||
|
it = g.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
String keyString = it.next().getName();
|
||||||
|
byte[] key = Bytes.toBytes(keyString);
|
||||||
|
HRegionLocation loc = conn.relocateRegion(tableName, key);
|
||||||
|
LOG.error("unreferred row " + keyString + ", " + loc);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1157,7 +1175,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setJobScannerConf(Job job) {
|
public static void setJobScannerConf(Job job) {
|
||||||
// Make sure scanners log something useful to make debugging possible.
|
// Make sure scanners log something useful to make debugging possible.
|
||||||
job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
|
job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
|
||||||
job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
|
job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
|
||||||
|
|
|
@ -0,0 +1,665 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTests;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.Import;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.security.access.AccessControlClient;
|
||||||
|
import org.apache.hadoop.hbase.security.visibility.Authorizations;
|
||||||
|
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
||||||
|
import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
|
||||||
|
import org.apache.hadoop.hbase.security.visibility.VisibilityController;
|
||||||
|
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
import org.apache.hadoop.mapreduce.CounterGroup;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IT test used to verify the deletes with visibility labels.
|
||||||
|
* The test creates three tables tablename_0, tablename_1 and tablename_2 and each table
|
||||||
|
* is associated with a unique pair of labels.
|
||||||
|
* Another common table with the name 'commontable' is created and it has the data combined
|
||||||
|
* from all these 3 tables such that there are 3 versions of every row but the visibility label
|
||||||
|
* in every row corresponds to the table from which the row originated.
|
||||||
|
* Then deletes are issued to the common table by selecting the visibility label
|
||||||
|
* associated with each of the smaller tables.
|
||||||
|
* After the delete is issued with one set of visibility labels we try to scan the common table
|
||||||
|
* with each of the visibility pairs defined for the 3 tables.
|
||||||
|
* So after the first delete is issued, a scan with the first set of visibility labels would
|
||||||
|
* return zero result whereas the scan issued with the other two sets of visibility labels
|
||||||
|
* should return all the rows corresponding to that set of visibility labels. The above
|
||||||
|
* process of delete and scan is repeated until after the last set of visibility labels are
|
||||||
|
* used for the deletes the common table should not return any row.
|
||||||
|
*
|
||||||
|
* To use this
|
||||||
|
* ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 20000 /tmp 1 10000
|
||||||
|
* or
|
||||||
|
* ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r .*IntegrationTestBigLinkedListWithVisibility.*
|
||||||
|
*/
|
||||||
|
@Category(IntegrationTests.class)
|
||||||
|
public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
|
||||||
|
|
||||||
|
private static final String CONFIDENTIAL = "confidential";
|
||||||
|
private static final String TOPSECRET = "topsecret";
|
||||||
|
private static final String SECRET = "secret";
|
||||||
|
private static final String PUBLIC = "public";
|
||||||
|
private static final String PRIVATE = "private";
|
||||||
|
private static final String EVERYONE = "everyone";
|
||||||
|
private static final String RESTRICTED = "restricted";
|
||||||
|
private static final String GROUP = "group";
|
||||||
|
private static final String PREVILIGED = "previliged";
|
||||||
|
private static final String OPEN = "open";
|
||||||
|
public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
|
||||||
|
+ "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
|
||||||
|
private static final String COMMA = ",";
|
||||||
|
private static final String UNDER_SCORE = "_";
|
||||||
|
public static int DEFAULT_TABLES_COUNT = 3;
|
||||||
|
public static String tableName = "tableName";
|
||||||
|
public static final String COMMON_TABLE_NAME = "commontable";
|
||||||
|
public static final String LABELS_KEY = "LABELS";
|
||||||
|
public static final String INDEX_KEY = "INDEX";
|
||||||
|
private static User USER;
|
||||||
|
private static final String OR = "|";
|
||||||
|
private static String USER_OPT = "user";
|
||||||
|
private static String userName = "user1";
|
||||||
|
|
||||||
|
static class VisibilityGenerator extends Generator {
|
||||||
|
private static final Log LOG = LogFactory.getLog(VisibilityGenerator.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createSchema() throws IOException {
|
||||||
|
LOG.info("Creating tables");
|
||||||
|
// Create three tables
|
||||||
|
boolean acl = AccessControlClient.isAccessControllerRunning(getConf());
|
||||||
|
if(!acl) {
|
||||||
|
LOG.info("No ACL available.");
|
||||||
|
}
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(getConf());
|
||||||
|
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
|
||||||
|
TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
|
||||||
|
createTable(admin, tableName, false, acl);
|
||||||
|
}
|
||||||
|
TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
|
||||||
|
createTable(admin, tableName, true, acl);
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTable(HBaseAdmin admin, TableName tableName, boolean setVersion,
|
||||||
|
boolean acl) throws IOException {
|
||||||
|
if (!admin.tableExists(tableName)) {
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
|
||||||
|
if (setVersion) {
|
||||||
|
family.setMaxVersions(DEFAULT_TABLES_COUNT);
|
||||||
|
}
|
||||||
|
htd.addFamily(family);
|
||||||
|
admin.createTable(htd);
|
||||||
|
if (acl) {
|
||||||
|
LOG.info("Granting permissions for user " + USER.getShortName());
|
||||||
|
AccessControlProtos.Permission.Action[] actions = { AccessControlProtos.Permission.Action.READ };
|
||||||
|
try {
|
||||||
|
AccessControlClient.grant(getConf(), tableName, USER.getShortName(), null, null,
|
||||||
|
actions);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.fatal("Error in granting permission for the user " + USER.getShortName(), e);
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setMapperForGenerator(Job job) {
|
||||||
|
job.setMapperClass(VisibilityGeneratorMapper.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class VisibilityGeneratorMapper extends GeneratorMapper {
|
||||||
|
HTable[] tables = new HTable[DEFAULT_TABLES_COUNT];
|
||||||
|
HTable commonTable = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
super.setup(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void instantiateHTable(Configuration conf) throws IOException {
|
||||||
|
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
|
||||||
|
HTable table = new HTable(conf, getTableName(i));
|
||||||
|
table.setAutoFlush(true, true);
|
||||||
|
//table.setWriteBufferSize(4 * 1024 * 1024);
|
||||||
|
this.tables[i] = table;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
|
||||||
|
if (tables[i] != null) {
|
||||||
|
tables[i].close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
|
||||||
|
byte[][] prev, byte[][] current, byte[] id) throws IOException {
|
||||||
|
String visibilityExps = "";
|
||||||
|
String[] split = labels.split(COMMA);
|
||||||
|
for (int i = 0; i < current.length; i++) {
|
||||||
|
for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
|
||||||
|
Put put = new Put(current[i]);
|
||||||
|
put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
|
||||||
|
|
||||||
|
if (count >= 0) {
|
||||||
|
put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
|
||||||
|
}
|
||||||
|
if (id != null) {
|
||||||
|
put.add(FAMILY_NAME, COLUMN_CLIENT, id);
|
||||||
|
}
|
||||||
|
visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
|
||||||
|
put.setCellVisibility(new CellVisibility(visibilityExps));
|
||||||
|
tables[j].put(put);
|
||||||
|
try {
|
||||||
|
Thread.sleep(1);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (i % 1000 == 0) {
|
||||||
|
// Tickle progress every so often else maprunner will think us hung
|
||||||
|
output.progress();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
|
||||||
|
tables[j].flushCommits();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Copier extends Configured implements Tool {
|
||||||
|
private static final Log LOG = LogFactory.getLog(Copier.class);
|
||||||
|
private TableName tableName;
|
||||||
|
private int labelIndex;
|
||||||
|
private boolean delete;
|
||||||
|
|
||||||
|
public Copier(TableName tableName, int index, boolean delete) {
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.labelIndex = index;
|
||||||
|
this.delete = delete;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int runCopier(String outputDir) throws Exception {
|
||||||
|
Job job = null;
|
||||||
|
Scan scan = null;
|
||||||
|
job = new Job(getConf());
|
||||||
|
job.setJobName("Data copier");
|
||||||
|
job.getConfiguration().setInt("INDEX", labelIndex);
|
||||||
|
job.getConfiguration().set("LABELS", labels);
|
||||||
|
job.setJarByClass(getClass());
|
||||||
|
scan = new Scan();
|
||||||
|
scan.setCacheBlocks(false);
|
||||||
|
scan.setRaw(true);
|
||||||
|
|
||||||
|
String[] split = labels.split(COMMA);
|
||||||
|
scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
|
||||||
|
split[(this.labelIndex * 2) + 1]));
|
||||||
|
if (delete) {
|
||||||
|
LOG.info("Running deletes");
|
||||||
|
} else {
|
||||||
|
LOG.info("Running copiers");
|
||||||
|
}
|
||||||
|
if (delete) {
|
||||||
|
TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
|
||||||
|
VisibilityDeleteImport.class, null, null, job);
|
||||||
|
} else {
|
||||||
|
TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
|
||||||
|
VisibilityImport.class, null, null, job);
|
||||||
|
}
|
||||||
|
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
|
||||||
|
job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
|
||||||
|
TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null);
|
||||||
|
TableMapReduceUtil.addDependencyJars(job);
|
||||||
|
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
|
||||||
|
TableMapReduceUtil.initCredentials(job);
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
boolean success = job.waitForCompletion(true);
|
||||||
|
return success ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] arg0) throws Exception {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class VisibilityImport extends Import.Importer {
|
||||||
|
private int index;
|
||||||
|
private String labels;
|
||||||
|
private String[] split;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
|
||||||
|
index = context.getConfiguration().getInt(INDEX_KEY, -1);
|
||||||
|
labels = context.getConfiguration().get(LABELS_KEY);
|
||||||
|
split = labels.split(COMMA);
|
||||||
|
super.setup(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addPutToKv(Put put, Cell kv) throws IOException {
|
||||||
|
String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
|
||||||
|
put.setCellVisibility(new CellVisibility(visibilityExps));
|
||||||
|
super.addPutToKv(put, kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class VisibilityDeleteImport extends Import.Importer {
|
||||||
|
private int index;
|
||||||
|
private String labels;
|
||||||
|
private String[] split;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
|
||||||
|
index = context.getConfiguration().getInt(INDEX_KEY, -1);
|
||||||
|
labels = context.getConfiguration().get(LABELS_KEY);
|
||||||
|
split = labels.split(COMMA);
|
||||||
|
super.setup(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creating delete here
|
||||||
|
@Override
|
||||||
|
protected void processKV(ImmutableBytesWritable key, Result result,
|
||||||
|
org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
|
||||||
|
org.apache.hadoop.hbase.client.Delete delete) throws
|
||||||
|
IOException, InterruptedException {
|
||||||
|
String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
|
||||||
|
for (Cell kv : result.rawCells()) {
|
||||||
|
// skip if we filter it out
|
||||||
|
if (kv == null)
|
||||||
|
continue;
|
||||||
|
// Create deletes here
|
||||||
|
if (delete == null) {
|
||||||
|
delete = new Delete(key.get());
|
||||||
|
}
|
||||||
|
delete.setCellVisibility(new CellVisibility(visibilityExps));
|
||||||
|
delete.deleteFamily(kv.getFamily());
|
||||||
|
}
|
||||||
|
if (delete != null) {
|
||||||
|
context.write(key, delete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addOptions() {
|
||||||
|
super.addOptions();
|
||||||
|
addOptWithArg("u", USER_OPT, "User name");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processOptions(CommandLine cmd) {
|
||||||
|
super.processOptions(cmd);
|
||||||
|
if (cmd.hasOption(USER_OPT)) {
|
||||||
|
userName = cmd.getOptionValue(USER_OPT);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void setUpCluster() throws Exception {
|
||||||
|
util = getTestingUtil(null);
|
||||||
|
Configuration conf = util.getConfiguration();
|
||||||
|
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
|
||||||
|
conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
|
||||||
|
conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());
|
||||||
|
conf.set("hbase.superuser", User.getCurrent().getName());
|
||||||
|
conf.setBoolean("dfs.permissions", false);
|
||||||
|
USER = User.createUserForTesting(conf, userName, new String[] {});
|
||||||
|
super.setUpCluster();
|
||||||
|
addLabels();
|
||||||
|
}
|
||||||
|
|
||||||
|
static TableName getTableName(int i) {
|
||||||
|
return TableName.valueOf(tableName + UNDER_SCORE + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addLabels() throws Exception {
|
||||||
|
try {
|
||||||
|
VisibilityClient.addLabels(util.getConfiguration(), labels.split(COMMA));
|
||||||
|
VisibilityClient.setAuths(util.getConfiguration(), labels.split(COMMA), USER.getName());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new IOException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class VisibilityVerify extends Verify {
|
||||||
|
private static final Log LOG = LogFactory.getLog(VisibilityVerify.class);
|
||||||
|
private TableName tableName;
|
||||||
|
private int labelIndex;
|
||||||
|
|
||||||
|
public VisibilityVerify(String tableName, int index) {
|
||||||
|
this.tableName = TableName.valueOf(tableName);
|
||||||
|
this.labelIndex = index;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(final Path outputDir, final int numReducers) throws Exception {
|
||||||
|
LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
|
||||||
|
PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer run() throws Exception {
|
||||||
|
return doVerify(outputDir, numReducers);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return USER.runAs(scanAction);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int doVerify(Path outputDir, int numReducers) throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
job = new Job(getConf());
|
||||||
|
|
||||||
|
job.setJobName("Link Verifier");
|
||||||
|
job.setNumReduceTasks(numReducers);
|
||||||
|
job.setJarByClass(getClass());
|
||||||
|
|
||||||
|
setJobScannerConf(job);
|
||||||
|
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addColumn(FAMILY_NAME, COLUMN_PREV);
|
||||||
|
scan.setCaching(10000);
|
||||||
|
scan.setCacheBlocks(false);
|
||||||
|
String[] split = labels.split(COMMA);
|
||||||
|
|
||||||
|
scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
|
||||||
|
split[(this.labelIndex * 2) + 1]));
|
||||||
|
|
||||||
|
TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
|
||||||
|
BytesWritable.class, BytesWritable.class, job);
|
||||||
|
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
|
||||||
|
|
||||||
|
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
|
||||||
|
|
||||||
|
job.setReducerClass(VerifyReducer.class);
|
||||||
|
job.setOutputFormatClass(TextOutputFormat.class);
|
||||||
|
TextOutputFormat.setOutputPath(job, outputDir);
|
||||||
|
boolean success = job.waitForCompletion(true);
|
||||||
|
|
||||||
|
return success ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void handleFailure(Counters counters) throws IOException {
|
||||||
|
Configuration conf = job.getConfiguration();
|
||||||
|
HConnection conn = HConnectionManager.getConnection(conf);
|
||||||
|
TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
|
||||||
|
CounterGroup g = counters.getGroup("undef");
|
||||||
|
Iterator<Counter> it = g.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
String keyString = it.next().getName();
|
||||||
|
byte[] key = Bytes.toBytes(keyString);
|
||||||
|
HRegionLocation loc = conn.relocateRegion(tableName, key);
|
||||||
|
LOG.error("undefined row " + keyString + ", " + loc);
|
||||||
|
}
|
||||||
|
g = counters.getGroup("unref");
|
||||||
|
it = g.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
String keyString = it.next().getName();
|
||||||
|
byte[] key = Bytes.toBytes(keyString);
|
||||||
|
HRegionLocation loc = conn.relocateRegion(tableName, key);
|
||||||
|
LOG.error("unreferred row " + keyString + ", " + loc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class VisibilityLoop extends Loop {
|
||||||
|
private static final int SLEEP_IN_MS = 5000;
|
||||||
|
private static final Log LOG = LogFactory.getLog(VisibilityLoop.class);
|
||||||
|
IntegrationTestBigLinkedListWithVisibility it;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
|
||||||
|
Integer wrapMuplitplier) throws Exception {
|
||||||
|
Path outputPath = new Path(outputDir);
|
||||||
|
UUID uuid = UUID.randomUUID(); // create a random UUID.
|
||||||
|
Path generatorOutput = new Path(outputPath, uuid.toString());
|
||||||
|
|
||||||
|
Generator generator = new VisibilityGenerator();
|
||||||
|
generator.setConf(getConf());
|
||||||
|
int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
|
||||||
|
if (retCode > 0) {
|
||||||
|
throw new RuntimeException("Generator failed with return code: " + retCode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
|
||||||
|
Integer wrapMuplitplier, int tableIndex) throws Exception {
|
||||||
|
LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
|
||||||
|
Copier copier = new Copier(
|
||||||
|
IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
|
||||||
|
copier.setConf(getConf());
|
||||||
|
copier.runCopier(outputDir);
|
||||||
|
Thread.sleep(SLEEP_IN_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
|
||||||
|
boolean allTables) throws Exception {
|
||||||
|
Path outputPath = new Path(outputDir);
|
||||||
|
|
||||||
|
if (allTables) {
|
||||||
|
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
|
||||||
|
LOG.info("Verifying table " + i);
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
UUID uuid = UUID.randomUUID(); // create a random UUID.
|
||||||
|
Path iterationOutput = new Path(outputPath, uuid.toString());
|
||||||
|
Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
|
||||||
|
verify(numReducers, expectedNumNodes, iterationOutput, verify);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
|
||||||
|
runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
|
||||||
|
throws Exception {
|
||||||
|
long temp = expectedNodes;
|
||||||
|
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
|
||||||
|
if (i <= tableIndex) {
|
||||||
|
expectedNodes = 0;
|
||||||
|
} else {
|
||||||
|
expectedNodes = temp;
|
||||||
|
}
|
||||||
|
LOG.info("Verifying data in the table with index "+i+ " and expected nodes is "+expectedNodes);
|
||||||
|
runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sleep(long ms) throws InterruptedException {
|
||||||
|
Thread.sleep(ms);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
|
||||||
|
int index) throws Exception {
|
||||||
|
LOG.info("Verifying common table with index " + index);
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
Path outputPath = new Path(outputDir);
|
||||||
|
UUID uuid = UUID.randomUUID(); // create a random UUID.
|
||||||
|
Path iterationOutput = new Path(outputPath, uuid.toString());
|
||||||
|
Verify verify = new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(),
|
||||||
|
index);
|
||||||
|
verify(numReducers, expectedNumNodes, iterationOutput, verify);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void runCopier(String outputDir) throws Exception {
|
||||||
|
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
|
||||||
|
LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
Copier copier = new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i,
|
||||||
|
false);
|
||||||
|
copier.setConf(getConf());
|
||||||
|
copier.runCopier(outputDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verify(int numReducers, long expectedNumNodes,
|
||||||
|
Path iterationOutput, Verify verify) throws Exception {
|
||||||
|
verify.setConf(getConf());
|
||||||
|
int retCode = verify.run(iterationOutput, numReducers);
|
||||||
|
if (retCode > 0) {
|
||||||
|
throw new RuntimeException("Verify.run failed with return code: " + retCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!verify.verify(expectedNumNodes)) {
|
||||||
|
throw new RuntimeException("Verify.verify failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
if (args.length < 5) {
|
||||||
|
System.err
|
||||||
|
.println("Usage: Loop <num iterations> " +
|
||||||
|
"<num mappers> <num nodes per mapper> <output dir> " +
|
||||||
|
"<num reducers> [<width> <wrap multiplier>]");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
LOG.info("Running Loop with args:" + Arrays.deepToString(args));
|
||||||
|
|
||||||
|
int numIterations = Integer.parseInt(args[0]);
|
||||||
|
int numMappers = Integer.parseInt(args[1]);
|
||||||
|
long numNodes = Long.parseLong(args[2]);
|
||||||
|
String outputDir = args[3];
|
||||||
|
int numReducers = Integer.parseInt(args[4]);
|
||||||
|
Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
|
||||||
|
Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
|
||||||
|
|
||||||
|
long expectedNumNodes = 0;
|
||||||
|
|
||||||
|
if (numIterations < 0) {
|
||||||
|
numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numIterations; i++) {
|
||||||
|
LOG.info("Starting iteration = " + i);
|
||||||
|
LOG.info("Generating data");
|
||||||
|
runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
|
||||||
|
expectedNumNodes += numMappers * numNodes;
|
||||||
|
// Copying wont work because expressions are not returned back to the
|
||||||
|
// client
|
||||||
|
LOG.info("Running copier");
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
runCopier(outputDir);
|
||||||
|
LOG.info("Verifying copied data");
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
runVerify(outputDir, numReducers, expectedNumNodes, true);
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
|
||||||
|
LOG.info("Deleting data on table with index: "+j);
|
||||||
|
runDelete(numMappers, numNodes, outputDir, width, wrapMuplitplier, j);
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
LOG.info("Verifying common table after deleting");
|
||||||
|
runVerify(outputDir, numReducers, expectedNumNodes, j);
|
||||||
|
sleep(SLEEP_IN_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Test
|
||||||
|
public void testContinuousIngest() throws IOException, Exception {
|
||||||
|
// Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
|
||||||
|
// <num reducers>
|
||||||
|
int ret = ToolRunner.run(
|
||||||
|
getTestingUtil(getConf()).getConfiguration(),
|
||||||
|
new VisibilityLoop(),
|
||||||
|
new String[] { "1", "1", "20000",
|
||||||
|
util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(),
|
||||||
|
"1", "10000" });
|
||||||
|
org.junit.Assert.assertEquals(0, ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
||||||
|
int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
|
||||||
|
System.exit(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MonkeyFactory getDefaultMonkeyFactory() {
|
||||||
|
return MonkeyFactory.getFactory(MonkeyFactory.CALM);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int runTestFromCommandLine() throws Exception {
|
||||||
|
Tool tool = null;
|
||||||
|
Loop loop = new VisibilityLoop();
|
||||||
|
loop.it = this;
|
||||||
|
tool = loop;
|
||||||
|
return ToolRunner.run(getConf(), tool, otherArgs);
|
||||||
|
}
|
||||||
|
}
|
|
@ -154,40 +154,49 @@ public class Import {
|
||||||
+ Bytes.toString(key.get(), key.getOffset(), key.getLength()));
|
+ Bytes.toString(key.get(), key.getOffset(), key.getLength()));
|
||||||
}
|
}
|
||||||
if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
|
if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
|
||||||
for (Cell kv : result.rawCells()) {
|
processKV(key, result, context, put, delete);
|
||||||
kv = filterKv(filter, kv);
|
}
|
||||||
// skip if we filter it out
|
}
|
||||||
if (kv == null) continue;
|
|
||||||
|
|
||||||
kv = convertKv(kv, cfRenameMap);
|
protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
|
||||||
// Deletes and Puts are gathered and written when finished
|
Delete delete) throws IOException, InterruptedException {
|
||||||
if (CellUtil.isDelete(kv)) {
|
for (Cell kv : result.rawCells()) {
|
||||||
if (delete == null) {
|
kv = filterKv(filter, kv);
|
||||||
delete = new Delete(key.get());
|
// skip if we filter it out
|
||||||
}
|
if (kv == null) continue;
|
||||||
delete.addDeleteMarker(kv);
|
|
||||||
} else {
|
kv = convertKv(kv, cfRenameMap);
|
||||||
if (put == null) {
|
// Deletes and Puts are gathered and written when finished
|
||||||
put = new Put(key.get());
|
if (CellUtil.isDelete(kv)) {
|
||||||
}
|
if (delete == null) {
|
||||||
put.add(kv);
|
delete = new Delete(key.get());
|
||||||
}
|
}
|
||||||
}
|
delete.addDeleteMarker(kv);
|
||||||
if (put != null) {
|
} else {
|
||||||
if (durability != null) {
|
if (put == null) {
|
||||||
put.setDurability(durability);
|
put = new Put(key.get());
|
||||||
}
|
}
|
||||||
put.setClusterIds(clusterIds);
|
addPutToKv(put, kv);
|
||||||
context.write(key, put);
|
|
||||||
}
|
|
||||||
if (delete != null) {
|
|
||||||
if (durability != null) {
|
|
||||||
delete.setDurability(durability);
|
|
||||||
}
|
|
||||||
delete.setClusterIds(clusterIds);
|
|
||||||
context.write(key, delete);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (put != null) {
|
||||||
|
if (durability != null) {
|
||||||
|
put.setDurability(durability);
|
||||||
|
}
|
||||||
|
put.setClusterIds(clusterIds);
|
||||||
|
context.write(key, put);
|
||||||
|
}
|
||||||
|
if (delete != null) {
|
||||||
|
if (durability != null) {
|
||||||
|
delete.setDurability(durability);
|
||||||
|
}
|
||||||
|
delete.setClusterIds(clusterIds);
|
||||||
|
context.write(key, delete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addPutToKv(Put put, Cell kv) throws IOException {
|
||||||
|
put.add(kv);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue