HBASE-13187 Add ITBLL that exercises per CF flush
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
4df24b8e6d
commit
7314645972
|
@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.test;
|
|||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -32,6 +32,7 @@ import java.util.Set;
|
|||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
|
@ -68,7 +69,6 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
|
|||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
|
@ -82,6 +82,8 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
|||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
|
||||
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
|
@ -103,16 +105,16 @@ import org.apache.hadoop.mapreduce.RecordReader;
|
|||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.Test;
|
||||
|
@ -192,6 +194,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
|
||||
|
||||
protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
|
||||
private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
|
||||
private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
|
||||
|
||||
//link to the id of the prev node in the linked list
|
||||
protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
|
||||
|
@ -240,6 +244,20 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(Generator.class);
|
||||
|
||||
/**
|
||||
* Set this configuration if you want to test single-column family flush works. If set, we will
|
||||
* add a big column family and a small column family on either side of the usual ITBLL 'meta'
|
||||
* column family. When we write out the ITBLL, we will also add to the big column family a value
|
||||
* bigger than that for ITBLL and for small, something way smaller. The idea is that when
|
||||
* flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
|
||||
* way. Here is how you would pass it:
|
||||
* <p>
|
||||
* $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
|
||||
* -Dgenerator.multiple.columnfamilies=true generator 1 10 g
|
||||
*/
|
||||
public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
|
||||
"generator.multiple.columnfamilies";
|
||||
|
||||
static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
|
||||
static class GeneratorInputSplit extends InputSplit implements Writable {
|
||||
@Override
|
||||
|
@ -330,8 +348,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
/**
|
||||
* Some ASCII art time:
|
||||
* <p>
|
||||
* [ . . . ] represents one batch of random longs of length WIDTH
|
||||
*
|
||||
* <pre>
|
||||
* _________________________
|
||||
* | ______ |
|
||||
* | | ||
|
||||
|
@ -351,6 +370,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
* ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
|
||||
* | |________||
|
||||
* |___________________________|
|
||||
* </pre>
|
||||
*/
|
||||
static class GeneratorMapper
|
||||
extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
|
||||
|
@ -366,6 +386,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
long numNodes;
|
||||
long wrap;
|
||||
int width;
|
||||
boolean multipleUnevenColumnFamilies;
|
||||
byte[] tinyValue = new byte[] { 't' };
|
||||
byte[] bigValue = null;
|
||||
|
||||
@Override
|
||||
protected void setup(Context context) throws IOException, InterruptedException {
|
||||
|
@ -382,6 +405,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
if (this.numNodes < this.wrap) {
|
||||
this.wrap = this.numNodes;
|
||||
}
|
||||
this.multipleUnevenColumnFamilies =
|
||||
context.getConfiguration().getBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, false);
|
||||
}
|
||||
|
||||
protected void instantiateHTable() throws IOException {
|
||||
|
@ -407,8 +432,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
persist(output, count, prev, current, id);
|
||||
i = 0;
|
||||
|
||||
if (first == null)
|
||||
if (first == null) {
|
||||
first = current;
|
||||
}
|
||||
prev = current;
|
||||
current = new byte[this.width][];
|
||||
|
||||
|
@ -438,13 +464,25 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
throws IOException {
|
||||
for (int i = 0; i < current.length; i++) {
|
||||
Put put = new Put(current[i]);
|
||||
put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
|
||||
put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
|
||||
|
||||
if (count >= 0) {
|
||||
put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
|
||||
put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
|
||||
}
|
||||
if (id != null) {
|
||||
put.add(FAMILY_NAME, COLUMN_CLIENT, id);
|
||||
put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
|
||||
}
|
||||
// See if we are to write multiple columns.
|
||||
if (this.multipleUnevenColumnFamilies) {
|
||||
// Use any column name.
|
||||
put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
|
||||
// If we've not allocated bigValue, do it now. Reuse same value each time.
|
||||
if (this.bigValue == null) {
|
||||
this.bigValue = new byte[current[i].length * 10];
|
||||
ThreadLocalRandom.current().nextBytes(this.bigValue);
|
||||
}
|
||||
// Use any column name.
|
||||
put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
|
||||
}
|
||||
mutator.mutate(put);
|
||||
|
||||
|
@ -478,12 +516,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
protected void createSchema() throws IOException {
|
||||
Configuration conf = getConf();
|
||||
Admin admin = new HBaseAdmin(conf);
|
||||
TableName tableName = getTableName(conf);
|
||||
try {
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
||||
Admin admin = conn.getAdmin()) {
|
||||
if (!admin.tableExists(tableName)) {
|
||||
HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
|
||||
htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
|
||||
// Always add these families. Just skip writing to them when we do not test per CF flush.
|
||||
htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
|
||||
htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
|
||||
int numberOfServers = admin.getClusterStatus().getServers().size();
|
||||
if (numberOfServers == 0) {
|
||||
throw new IllegalStateException("No live regionservers");
|
||||
|
@ -502,8 +543,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
} catch (MasterNotRunningException e) {
|
||||
LOG.error("Master not running", e);
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
admin.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -511,7 +550,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
Integer width, Integer wrapMuplitplier) throws Exception {
|
||||
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
|
||||
+ ", numNodes=" + numNodes);
|
||||
Job job = new Job(getConf());
|
||||
Job job = Job.getInstance(getConf());
|
||||
|
||||
job.setJobName("Random Input Generator");
|
||||
job.setNumReduceTasks(0);
|
||||
|
@ -537,7 +576,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
Integer width, Integer wrapMuplitplier) throws Exception {
|
||||
LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
|
||||
createSchema();
|
||||
Job job = new Job(getConf());
|
||||
Job job = Job.getInstance(getConf());
|
||||
|
||||
job.setJobName("Link Generator");
|
||||
job.setNumReduceTasks(0);
|
||||
|
@ -555,6 +594,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
|
||||
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
|
||||
String multipleUnevenColumnFamiliesStr = System.getProperty(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY);
|
||||
if (multipleUnevenColumnFamiliesStr != null) {
|
||||
job.getConfiguration().setBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
|
||||
Boolean.parseBoolean(multipleUnevenColumnFamiliesStr));
|
||||
}
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
|
||||
TableMapReduceUtil.initCredentials(job);
|
||||
|
@ -708,9 +752,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
return result;
|
||||
}
|
||||
|
||||
private static SortedSet<byte []> readFileToSearch(final Configuration conf,
|
||||
final FileSystem fs, final LocatedFileStatus keyFileStatus)
|
||||
throws IOException, InterruptedException {
|
||||
private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
|
||||
final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
|
||||
InterruptedException {
|
||||
SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
|
||||
// Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
|
||||
// what is missing.
|
||||
|
@ -723,13 +767,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
while (rr.nextKeyValue()) {
|
||||
rr.getCurrentKey();
|
||||
BytesWritable bw = rr.getCurrentValue();
|
||||
switch (Verify.VerifyReducer.whichType(bw.getBytes())) {
|
||||
case UNDEFINED:
|
||||
byte [] key = new byte [rr.getCurrentKey().getLength()];
|
||||
System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
|
||||
rr.getCurrentKey().getLength());
|
||||
if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
|
||||
byte[] key = new byte[rr.getCurrentKey().getLength()];
|
||||
System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
|
||||
.getLength());
|
||||
result.add(key);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -744,7 +786,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
static class Verify extends Configured implements Tool {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(Verify.class);
|
||||
protected static final BytesWritable DEF = new BytesWritable(NO_KEY);
|
||||
protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
|
||||
protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
|
||||
|
||||
protected Job job;
|
||||
|
||||
|
@ -752,12 +795,29 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
private BytesWritable row = new BytesWritable();
|
||||
private BytesWritable ref = new BytesWritable();
|
||||
|
||||
private boolean multipleUnevenColumnFamilies;
|
||||
|
||||
@Override
|
||||
protected void setup(
|
||||
Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
|
||||
throws IOException, InterruptedException {
|
||||
this.multipleUnevenColumnFamilies =
|
||||
context.getConfiguration().getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
|
||||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void map(ImmutableBytesWritable key, Result value, Context context)
|
||||
throws IOException ,InterruptedException {
|
||||
byte[] rowKey = key.get();
|
||||
row.set(rowKey, 0, rowKey.length);
|
||||
context.write(row, DEF);
|
||||
if (multipleUnevenColumnFamilies
|
||||
&& (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
|
||||
TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
|
||||
context.write(row, DEF_LOST_FAMILIES);
|
||||
} else {
|
||||
context.write(row, DEF);
|
||||
}
|
||||
byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
|
||||
if (prev != null && prev.length > 0) {
|
||||
ref.set(prev, 0, prev.length);
|
||||
|
@ -773,7 +833,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
* problems found from the reducer.
|
||||
*/
|
||||
public static enum Counts {
|
||||
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES
|
||||
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
|
||||
LOST_FAMILIES
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -781,11 +842,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
* subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
|
||||
* saying what sort of emission it is. Flag is the Count enum ordinal as a short.
|
||||
*/
|
||||
public static class VerifyReducer
|
||||
extends Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable> {
|
||||
public static class VerifyReducer extends
|
||||
Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
|
||||
private ArrayList<byte[]> refs = new ArrayList<byte[]>();
|
||||
private final BytesWritable UNREF =
|
||||
new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte [] {}));
|
||||
private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
|
||||
Counts.UNREFERENCED.ordinal(), new byte[] {}));
|
||||
private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
|
||||
Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
|
||||
|
||||
private AtomicInteger rows = new AtomicInteger(0);
|
||||
private Connection connection;
|
||||
|
@ -798,9 +861,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
|
||||
throws IOException, InterruptedException {
|
||||
if (this.connection != null) this.connection.close();
|
||||
protected void cleanup(
|
||||
Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
|
||||
throws IOException, InterruptedException {
|
||||
if (this.connection != null) {
|
||||
this.connection.close();
|
||||
}
|
||||
super.cleanup(context);
|
||||
}
|
||||
|
||||
|
@ -810,12 +876,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
* @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
|
||||
* Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
|
||||
*/
|
||||
public static byte [] addPrefixFlag(final int ordinal, final byte [] r) {
|
||||
byte [] prefix = Bytes.toBytes((short)ordinal);
|
||||
public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
|
||||
byte[] prefix = Bytes.toBytes((short)ordinal);
|
||||
if (prefix.length != Bytes.SIZEOF_SHORT) {
|
||||
throw new RuntimeException("Unexpected size: " + prefix.length);
|
||||
}
|
||||
byte [] result = new byte [prefix.length + r.length];
|
||||
byte[] result = new byte[prefix.length + r.length];
|
||||
System.arraycopy(prefix, 0, result, 0, prefix.length);
|
||||
System.arraycopy(r, 0, result, prefix.length, r.length);
|
||||
return result;
|
||||
|
@ -835,21 +901,24 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
* @param bw
|
||||
* @return Row bytes minus the type flag.
|
||||
*/
|
||||
public static byte [] getRowOnly(BytesWritable bw) {
|
||||
byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
|
||||
public static byte[] getRowOnly(BytesWritable bw) {
|
||||
byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
|
||||
System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
throws IOException, InterruptedException {
|
||||
int defCount = 0;
|
||||
boolean lostFamilies = false;
|
||||
refs.clear();
|
||||
for (BytesWritable type : values) {
|
||||
if (type.getLength() == DEF.getLength()) {
|
||||
defCount++;
|
||||
if (type.getBytes()[0] == 1) {
|
||||
lostFamilies = true;
|
||||
}
|
||||
} else {
|
||||
byte[] bytes = new byte[type.getLength()];
|
||||
System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
|
||||
|
@ -865,13 +934,18 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
|
||||
(refsSb != null? refsSb.toString(): ""));
|
||||
}
|
||||
if (lostFamilies) {
|
||||
LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
|
||||
context.getCounter(Counts.LOST_FAMILIES).increment(1);
|
||||
context.write(key, LOSTFAM);
|
||||
}
|
||||
|
||||
if (defCount == 0 && refs.size() > 0) {
|
||||
// This is bad, found a node that is referenced but not defined. It must have been
|
||||
// lost, emit some info about this node for debugging purposes.
|
||||
// Write out a line per reference. If more than one, flag it.;
|
||||
for (int i = 0; i < refs.size(); i++) {
|
||||
byte [] bs = refs.get(i);
|
||||
byte[] bs = refs.get(i);
|
||||
int ordinal;
|
||||
if (i <= 0) {
|
||||
ordinal = Counts.UNDEFINED.ordinal();
|
||||
|
@ -967,16 +1041,16 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
|
||||
if (args.length != 2) {
|
||||
System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
|
||||
System.out.println("Usage : " + Verify.class.getSimpleName()
|
||||
+ " <output dir> <num reducers>");
|
||||
return 0;
|
||||
}
|
||||
|
||||
String outputDir = args[0];
|
||||
int numReducers = Integer.parseInt(args[1]);
|
||||
|
||||
return run(outputDir, numReducers);
|
||||
return run(outputDir, numReducers);
|
||||
}
|
||||
|
||||
public int run(String outputDir, int numReducers) throws Exception {
|
||||
|
@ -986,7 +1060,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
public int run(Path outputDir, int numReducers) throws Exception {
|
||||
LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
|
||||
|
||||
job = new Job(getConf());
|
||||
job = Job.getInstance(getConf());
|
||||
|
||||
job.setJobName("Link Verifier");
|
||||
job.setNumReduceTasks(numReducers);
|
||||
|
@ -998,6 +1072,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
scan.addColumn(FAMILY_NAME, COLUMN_PREV);
|
||||
scan.setCaching(10000);
|
||||
scan.setCacheBlocks(false);
|
||||
if (isMultiUnevenColumnFamilies()) {
|
||||
scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
|
||||
scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
|
||||
job.getConfiguration().setBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true);
|
||||
}
|
||||
|
||||
TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
|
||||
VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
|
||||
|
@ -1016,7 +1095,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
return success ? 0 : 1;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public boolean verify(long expectedReferenced) throws Exception {
|
||||
if (job == null) {
|
||||
throw new IllegalStateException("You should call run() first");
|
||||
|
@ -1028,6 +1106,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
|
||||
Counter undefined = counters.findCounter(Counts.UNDEFINED);
|
||||
Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
|
||||
Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
|
||||
|
||||
boolean success = true;
|
||||
//assert
|
||||
|
@ -1049,6 +1128,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
success = false;
|
||||
}
|
||||
|
||||
if (lostfamilies.getValue() > 0) {
|
||||
LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
|
||||
success = false;
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
handleFailure(counters);
|
||||
}
|
||||
|
@ -1357,17 +1441,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
Path p = new Path(args[0]);
|
||||
Configuration conf = getConf();
|
||||
TableName tableName = getTableName(conf);
|
||||
try (FileSystem fs = HFileSystem.get(conf);
|
||||
Connection conn = ConnectionFactory.createConnection(conf);
|
||||
Admin admin = conn.getAdmin()) {
|
||||
if (admin.tableExists(tableName)) {
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
|
||||
FileSystem fs = HFileSystem.get(conf);
|
||||
Admin admin = new HBaseAdmin(conf);
|
||||
|
||||
if (admin.tableExists(tableName)) {
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
|
||||
if (fs.exists(p)) {
|
||||
fs.delete(p, true);
|
||||
if (fs.exists(p)) {
|
||||
fs.delete(p, true);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -1419,12 +1503,22 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean isMultiUnevenColumnFamilies() {
|
||||
return Boolean.TRUE.toString().equalsIgnoreCase(
|
||||
System.getProperty(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContinuousIngest() throws IOException, Exception {
|
||||
//Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
|
||||
int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new Loop(),
|
||||
new String[] {"1", "1", "2000000",
|
||||
util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1"});
|
||||
Configuration conf = getTestingUtil(getConf()).getConfiguration();
|
||||
if (isMultiUnevenColumnFamilies()) {
|
||||
// make sure per CF flush is on
|
||||
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
|
||||
}
|
||||
int ret =
|
||||
ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
|
||||
util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
|
||||
org.junit.Assert.assertEquals(0, ret);
|
||||
}
|
||||
|
||||
|
@ -1467,7 +1561,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
@Override
|
||||
public int runTestFromCommandLine() throws Exception {
|
||||
|
||||
Tool tool = null;
|
||||
if (toRun.equalsIgnoreCase("Generator")) {
|
||||
tool = new Generator();
|
||||
|
@ -1503,7 +1596,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
@Override
|
||||
protected Set<String> getColumnFamilies() {
|
||||
return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
|
||||
if (isMultiUnevenColumnFamilies()) {
|
||||
return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
|
||||
Bytes.toString(TINY_FAMILY_NAME));
|
||||
} else {
|
||||
return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
private static void setJobConf(Job job, int numMappers, long numNodes,
|
||||
|
|
Loading…
Reference in New Issue