HBASE-13187 Add ITBLL that exercises per CF flush

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
zhangduo 2015-04-01 20:56:17 +08:00 committed by stack
parent 874aa9eb85
commit f1b53d71b8
1 changed files with 160 additions and 62 deletions

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.test;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -32,6 +32,7 @@ import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
@ -79,6 +79,8 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.AbstractHBaseTool;
@ -100,16 +102,16 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Test; import org.junit.Test;
@ -189,6 +191,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
//link to the id of the prev node in the linked list //link to the id of the prev node in the linked list
protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
@ -237,6 +241,20 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
private static final Log LOG = LogFactory.getLog(Generator.class); private static final Log LOG = LogFactory.getLog(Generator.class);
/**
* Set this configuration if you want to test single-column family flush works. If set, we will
* add a big column family and a small column family on either side of the usual ITBLL 'meta'
* column family. When we write out the ITBLL, we will also add to the big column family a value
* bigger than that for ITBLL and for small, something way smaller. The idea is that when
* flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
* way. Here is how you would pass it:
* <p>
* $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
* -Dgenerator.multiple.columnfamilies=true generator 1 10 g
*/
public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
"generator.multiple.columnfamilies";
static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> { static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
static class GeneratorInputSplit extends InputSplit implements Writable { static class GeneratorInputSplit extends InputSplit implements Writable {
@Override @Override
@ -327,8 +345,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
/** /**
* Some ASCII art time: * Some ASCII art time:
* <p>
* [ . . . ] represents one batch of random longs of length WIDTH * [ . . . ] represents one batch of random longs of length WIDTH
* * <pre>
* _________________________ * _________________________
* | ______ | * | ______ |
* | | || * | | ||
@ -348,6 +367,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____||| * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
* | |________|| * | |________||
* |___________________________| * |___________________________|
* </pre>
*/ */
static class GeneratorMapper static class GeneratorMapper
extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
@ -363,6 +383,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
long numNodes; long numNodes;
long wrap; long wrap;
int width; int width;
boolean multipleUnevenColumnFamilies;
byte[] tinyValue = new byte[] { 't' };
byte[] bigValue = null;
@Override @Override
protected void setup(Context context) throws IOException, InterruptedException { protected void setup(Context context) throws IOException, InterruptedException {
@ -378,6 +401,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
if (this.numNodes < this.wrap) { if (this.numNodes < this.wrap) {
this.wrap = this.numNodes; this.wrap = this.numNodes;
} }
this.multipleUnevenColumnFamilies =
context.getConfiguration().getBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, false);
} }
protected void instantiateHTable() throws IOException { protected void instantiateHTable() throws IOException {
@ -403,8 +428,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
persist(output, count, prev, current, id); persist(output, count, prev, current, id);
i = 0; i = 0;
if (first == null) if (first == null) {
first = current; first = current;
}
prev = current; prev = current;
current = new byte[this.width][]; current = new byte[this.width][];
@ -434,13 +460,25 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
throws IOException { throws IOException {
for (int i = 0; i < current.length; i++) { for (int i = 0; i < current.length; i++) {
Put put = new Put(current[i]); Put put = new Put(current[i]);
put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
if (count >= 0) { if (count >= 0) {
put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
} }
if (id != null) { if (id != null) {
put.add(FAMILY_NAME, COLUMN_CLIENT, id); put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
}
// See if we are to write multiple columns.
if (this.multipleUnevenColumnFamilies) {
// Use any column name.
put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
// If we've not allocated bigValue, do it now. Reuse same value each time.
if (this.bigValue == null) {
this.bigValue = new byte[current[i].length * 10];
ThreadLocalRandom.current().nextBytes(this.bigValue);
}
// Use any column name.
put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
} }
mutator.mutate(put); mutator.mutate(put);
@ -474,12 +512,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
protected void createSchema() throws IOException { protected void createSchema() throws IOException {
Configuration conf = getConf(); Configuration conf = getConf();
Admin admin = new HBaseAdmin(conf);
TableName tableName = getTableName(conf); TableName tableName = getTableName(conf);
try { try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
if (!admin.tableExists(tableName)) { if (!admin.tableExists(tableName)) {
HTableDescriptor htd = new HTableDescriptor(getTableName(getConf())); HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
htd.addFamily(new HColumnDescriptor(FAMILY_NAME)); htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
// Always add these families. Just skip writing to them when we do not test per CF flush.
htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
int numberOfServers = admin.getClusterStatus().getServers().size(); int numberOfServers = admin.getClusterStatus().getServers().size();
if (numberOfServers == 0) { if (numberOfServers == 0) {
throw new IllegalStateException("No live regionservers"); throw new IllegalStateException("No live regionservers");
@ -498,8 +539,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} catch (MasterNotRunningException e) { } catch (MasterNotRunningException e) {
LOG.error("Master not running", e); LOG.error("Master not running", e);
throw new IOException(e); throw new IOException(e);
} finally {
admin.close();
} }
} }
@ -507,7 +546,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
Integer width, Integer wrapMuplitplier) throws Exception { Integer width, Integer wrapMuplitplier) throws Exception {
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
+ ", numNodes=" + numNodes); + ", numNodes=" + numNodes);
Job job = new Job(getConf()); Job job = Job.getInstance(getConf());
job.setJobName("Random Input Generator"); job.setJobName("Random Input Generator");
job.setNumReduceTasks(0); job.setNumReduceTasks(0);
@ -533,7 +572,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
Integer width, Integer wrapMuplitplier) throws Exception { Integer width, Integer wrapMuplitplier) throws Exception {
LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
createSchema(); createSchema();
Job job = new Job(getConf()); Job job = Job.getInstance(getConf());
job.setJobName("Link Generator"); job.setJobName("Link Generator");
job.setNumReduceTasks(0); job.setNumReduceTasks(0);
@ -551,6 +590,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
job.setOutputFormatClass(NullOutputFormat.class); job.setOutputFormatClass(NullOutputFormat.class);
job.getConfiguration().setBoolean("mapreduce.map.speculative", false); job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
String multipleUnevenColumnFamiliesStr = System.getProperty(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY);
if (multipleUnevenColumnFamiliesStr != null) {
job.getConfiguration().setBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
Boolean.parseBoolean(multipleUnevenColumnFamiliesStr));
}
TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
TableMapReduceUtil.initCredentials(job); TableMapReduceUtil.initCredentials(job);
@ -704,9 +748,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
return result; return result;
} }
private static SortedSet<byte []> readFileToSearch(final Configuration conf, private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
final FileSystem fs, final LocatedFileStatus keyFileStatus) final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
throws IOException, InterruptedException { InterruptedException {
SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR); SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
// Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
// what is missing. // what is missing.
@ -719,13 +763,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
while (rr.nextKeyValue()) { while (rr.nextKeyValue()) {
rr.getCurrentKey(); rr.getCurrentKey();
BytesWritable bw = rr.getCurrentValue(); BytesWritable bw = rr.getCurrentValue();
switch (Verify.VerifyReducer.whichType(bw.getBytes())) { if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
case UNDEFINED: byte[] key = new byte[rr.getCurrentKey().getLength()];
byte [] key = new byte [rr.getCurrentKey().getLength()]; System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, .getLength());
rr.getCurrentKey().getLength());
result.add(key); result.add(key);
break;
} }
} }
} }
@ -740,7 +782,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
static class Verify extends Configured implements Tool { static class Verify extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(Verify.class); private static final Log LOG = LogFactory.getLog(Verify.class);
protected static final BytesWritable DEF = new BytesWritable(NO_KEY); protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
protected Job job; protected Job job;
@ -748,12 +791,29 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
private BytesWritable row = new BytesWritable(); private BytesWritable row = new BytesWritable();
private BytesWritable ref = new BytesWritable(); private BytesWritable ref = new BytesWritable();
private boolean multipleUnevenColumnFamilies;
@Override
protected void setup(
Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
throws IOException, InterruptedException {
this.multipleUnevenColumnFamilies =
context.getConfiguration().getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
false);
}
@Override @Override
protected void map(ImmutableBytesWritable key, Result value, Context context) protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException ,InterruptedException { throws IOException ,InterruptedException {
byte[] rowKey = key.get(); byte[] rowKey = key.get();
row.set(rowKey, 0, rowKey.length); row.set(rowKey, 0, rowKey.length);
if (multipleUnevenColumnFamilies
&& (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
context.write(row, DEF_LOST_FAMILIES);
} else {
context.write(row, DEF); context.write(row, DEF);
}
byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
if (prev != null && prev.length > 0) { if (prev != null && prev.length > 0) {
ref.set(prev, 0, prev.length); ref.set(prev, 0, prev.length);
@ -769,7 +829,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* problems found from the reducer. * problems found from the reducer.
*/ */
public static enum Counts { public static enum Counts {
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
LOST_FAMILIES
} }
/** /**
@ -777,11 +838,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
* saying what sort of emission it is. Flag is the Count enum ordinal as a short. * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
*/ */
public static class VerifyReducer public static class VerifyReducer extends
extends Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable> { Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
private ArrayList<byte[]> refs = new ArrayList<byte[]>(); private ArrayList<byte[]> refs = new ArrayList<byte[]>();
private final BytesWritable UNREF = private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte [] {})); Counts.UNREFERENCED.ordinal(), new byte[] {}));
private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
private AtomicInteger rows = new AtomicInteger(0); private AtomicInteger rows = new AtomicInteger(0);
private Connection connection; private Connection connection;
@ -794,9 +857,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} }
@Override @Override
protected void cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) protected void cleanup(
Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (this.connection != null) this.connection.close(); if (this.connection != null) {
this.connection.close();
}
super.cleanup(context); super.cleanup(context);
} }
@ -806,12 +872,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* @return Return new byte array that has <code>ordinal</code> as prefix on front taking up * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
* Bytes.SIZEOF_SHORT bytes followed by <code>r</code> * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
*/ */
public static byte [] addPrefixFlag(final int ordinal, final byte [] r) { public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
byte [] prefix = Bytes.toBytes((short)ordinal); byte[] prefix = Bytes.toBytes((short)ordinal);
if (prefix.length != Bytes.SIZEOF_SHORT) { if (prefix.length != Bytes.SIZEOF_SHORT) {
throw new RuntimeException("Unexpected size: " + prefix.length); throw new RuntimeException("Unexpected size: " + prefix.length);
} }
byte [] result = new byte [prefix.length + r.length]; byte[] result = new byte[prefix.length + r.length];
System.arraycopy(prefix, 0, result, 0, prefix.length); System.arraycopy(prefix, 0, result, 0, prefix.length);
System.arraycopy(r, 0, result, prefix.length, r.length); System.arraycopy(r, 0, result, prefix.length, r.length);
return result; return result;
@ -831,8 +897,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* @param bw * @param bw
* @return Row bytes minus the type flag. * @return Row bytes minus the type flag.
*/ */
public static byte [] getRowOnly(BytesWritable bw) { public static byte[] getRowOnly(BytesWritable bw) {
byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
return bytes; return bytes;
} }
@ -840,12 +906,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
@Override @Override
public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
int defCount = 0; int defCount = 0;
boolean lostFamilies = false;
refs.clear(); refs.clear();
for (BytesWritable type : values) { for (BytesWritable type : values) {
if (type.getLength() == DEF.getLength()) { if (type.getLength() == DEF.getLength()) {
defCount++; defCount++;
if (type.getBytes()[0] == 1) {
lostFamilies = true;
}
} else { } else {
byte[] bytes = new byte[type.getLength()]; byte[] bytes = new byte[type.getLength()];
System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
@ -861,13 +930,18 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" + LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
(refsSb != null? refsSb.toString(): "")); (refsSb != null? refsSb.toString(): ""));
} }
if (lostFamilies) {
LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
context.getCounter(Counts.LOST_FAMILIES).increment(1);
context.write(key, LOSTFAM);
}
if (defCount == 0 && refs.size() > 0) { if (defCount == 0 && refs.size() > 0) {
// This is bad, found a node that is referenced but not defined. It must have been // This is bad, found a node that is referenced but not defined. It must have been
// lost, emit some info about this node for debugging purposes. // lost, emit some info about this node for debugging purposes.
// Write out a line per reference. If more than one, flag it.; // Write out a line per reference. If more than one, flag it.;
for (int i = 0; i < refs.size(); i++) { for (int i = 0; i < refs.size(); i++) {
byte [] bs = refs.get(i); byte[] bs = refs.get(i);
int ordinal; int ordinal;
if (i <= 0) { if (i <= 0) {
ordinal = Counts.UNDEFINED.ordinal(); ordinal = Counts.UNDEFINED.ordinal();
@ -963,9 +1037,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
if (args.length != 2) { if (args.length != 2) {
System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>"); System.out.println("Usage : " + Verify.class.getSimpleName()
+ " <output dir> <num reducers>");
return 0; return 0;
} }
@ -982,7 +1056,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
public int run(Path outputDir, int numReducers) throws Exception { public int run(Path outputDir, int numReducers) throws Exception {
LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers); LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
job = new Job(getConf()); job = Job.getInstance(getConf());
job.setJobName("Link Verifier"); job.setJobName("Link Verifier");
job.setNumReduceTasks(numReducers); job.setNumReduceTasks(numReducers);
@ -994,6 +1068,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
scan.addColumn(FAMILY_NAME, COLUMN_PREV); scan.addColumn(FAMILY_NAME, COLUMN_PREV);
scan.setCaching(10000); scan.setCaching(10000);
scan.setCacheBlocks(false); scan.setCacheBlocks(false);
if (isMultiUnevenColumnFamilies()) {
scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
job.getConfiguration().setBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true);
}
TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
@ -1012,7 +1091,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
return success ? 0 : 1; return success ? 0 : 1;
} }
@SuppressWarnings("deprecation")
public boolean verify(long expectedReferenced) throws Exception { public boolean verify(long expectedReferenced) throws Exception {
if (job == null) { if (job == null) {
throw new IllegalStateException("You should call run() first"); throw new IllegalStateException("You should call run() first");
@ -1024,6 +1102,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
Counter undefined = counters.findCounter(Counts.UNDEFINED); Counter undefined = counters.findCounter(Counts.UNDEFINED);
Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES); Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
boolean success = true; boolean success = true;
//assert //assert
@ -1045,6 +1124,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
success = false; success = false;
} }
if (lostfamilies.getValue() > 0) {
LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
success = false;
}
if (!success) { if (!success) {
handleFailure(counters); handleFailure(counters);
} }
@ -1358,10 +1442,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
Path p = new Path(args[0]); Path p = new Path(args[0]);
Configuration conf = getConf(); Configuration conf = getConf();
TableName tableName = getTableName(conf); TableName tableName = getTableName(conf);
try (FileSystem fs = HFileSystem.get(conf);
FileSystem fs = HFileSystem.get(conf); Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = new HBaseAdmin(conf); Admin admin = conn.getAdmin()) {
if (admin.tableExists(tableName)) { if (admin.tableExists(tableName)) {
admin.disableTable(tableName); admin.disableTable(tableName);
admin.deleteTable(tableName); admin.deleteTable(tableName);
@ -1370,6 +1453,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
if (fs.exists(p)) { if (fs.exists(p)) {
fs.delete(p, true); fs.delete(p, true);
} }
}
return 0; return 0;
} }
@ -1420,12 +1504,22 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} }
} }
private static boolean isMultiUnevenColumnFamilies() {
return Boolean.TRUE.toString().equalsIgnoreCase(
System.getProperty(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY));
}
@Test @Test
public void testContinuousIngest() throws IOException, Exception { public void testContinuousIngest() throws IOException, Exception {
//Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new Loop(), Configuration conf = getTestingUtil(getConf()).getConfiguration();
new String[] {"1", "1", "2000000", if (isMultiUnevenColumnFamilies()) {
util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1"}); // make sure per CF flush is on
conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
}
int ret =
ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
org.junit.Assert.assertEquals(0, ret); org.junit.Assert.assertEquals(0, ret);
} }
@ -1468,7 +1562,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
@Override @Override
public int runTestFromCommandLine() throws Exception { public int runTestFromCommandLine() throws Exception {
Tool tool = null; Tool tool = null;
if (toRun.equalsIgnoreCase("Generator")) { if (toRun.equalsIgnoreCase("Generator")) {
tool = new Generator(); tool = new Generator();
@ -1504,8 +1597,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
@Override @Override
protected Set<String> getColumnFamilies() { protected Set<String> getColumnFamilies() {
if (isMultiUnevenColumnFamilies()) {
return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
Bytes.toString(TINY_FAMILY_NAME));
} else {
return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
} }
}
private static void setJobConf(Job job, int numMappers, long numNodes, private static void setJobConf(Job job, int numMappers, long numNodes,
Integer width, Integer wrapMultiplier) { Integer width, Integer wrapMultiplier) {