diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 9864031c871..19a3dbdd983 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.test; import java.io.DataInput; import java.io.DataOutput; -import java.io.IOException; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +32,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; @@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; @@ -79,6 +79,8 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; +import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; @@ -100,16 +102,16 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.junit.Test; @@ -189,6 +191,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); + private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big"); + private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny"); //link to the id of the prev node in the linked list protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); @@ -237,6 +241,20 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { private static final Log LOG = LogFactory.getLog(Generator.class); + /** + * Set this configuration if you want to test single-column family flush works. If set, we will + * add a big column family and a small column family on either side of the usual ITBLL 'meta' + * column family. When we write out the ITBLL, we will also add to the big column family a value + * bigger than that for ITBLL and for small, something way smaller. The idea is that when + * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any + * way. Here is how you would pass it: + *

+ * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList + * -Dgenerator.multiple.columnfamilies=true generator 1 10 g + */ + public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = + "generator.multiple.columnfamilies"; + static class GeneratorInputFormat extends InputFormat { static class GeneratorInputSplit extends InputSplit implements Writable { @Override @@ -327,8 +345,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { /** * Some ASCII art time: + *

* [ . . . ] represents one batch of random longs of length WIDTH - * + *

      *                _________________________
      *               |                  ______ |
      *               |                 |      ||
@@ -348,6 +367,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
      *             |                 |________||
      *             |___________________________|
+     * 
*/ static class GeneratorMapper extends Mapper { @@ -363,6 +383,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { long numNodes; long wrap; int width; + boolean multipleUnevenColumnFamilies; + byte[] tinyValue = new byte[] { 't' }; + byte[] bigValue = null; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -378,6 +401,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (this.numNodes < this.wrap) { this.wrap = this.numNodes; } + this.multipleUnevenColumnFamilies = + context.getConfiguration().getBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, false); } protected void instantiateHTable() throws IOException { @@ -403,8 +428,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { persist(output, count, prev, current, id); i = 0; - if (first == null) + if (first == null) { first = current; + } prev = current; current = new byte[this.width][]; @@ -434,13 +460,25 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { throws IOException { for (int i = 0; i < current.length; i++) { Put put = new Put(current[i]); - put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); + put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); if (count >= 0) { - put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); + put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); } if (id != null) { - put.add(FAMILY_NAME, COLUMN_CLIENT, id); + put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); + } + // See if we are to write multiple columns. + if (this.multipleUnevenColumnFamilies) { + // Use any column name. + put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue); + // If we've not allocated bigValue, do it now. Reuse same value each time. + if (this.bigValue == null) { + this.bigValue = new byte[current[i].length * 10]; + ThreadLocalRandom.current().nextBytes(this.bigValue); + } + // Use any column name. + put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue); } mutator.mutate(put); @@ -474,12 +512,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected void createSchema() throws IOException { Configuration conf = getConf(); - Admin admin = new HBaseAdmin(conf); TableName tableName = getTableName(conf); - try { + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin()) { if (!admin.tableExists(tableName)) { HTableDescriptor htd = new HTableDescriptor(getTableName(getConf())); htd.addFamily(new HColumnDescriptor(FAMILY_NAME)); + // Always add these families. Just skip writing to them when we do not test per CF flush. + htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME)); + htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME)); int numberOfServers = admin.getClusterStatus().getServers().size(); if (numberOfServers == 0) { throw new IllegalStateException("No live regionservers"); @@ -498,8 +539,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } catch (MasterNotRunningException e) { LOG.error("Master not running", e); throw new IOException(e); - } finally { - admin.close(); } } @@ -507,7 +546,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { Integer width, Integer wrapMuplitplier) throws Exception { LOG.info("Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes); - Job job = new Job(getConf()); + Job job = Job.getInstance(getConf()); job.setJobName("Random Input Generator"); job.setNumReduceTasks(0); @@ -533,7 +572,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { Integer width, Integer wrapMuplitplier) throws Exception { LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); createSchema(); - Job job = new Job(getConf()); + Job job = Job.getInstance(getConf()); job.setJobName("Link Generator"); job.setNumReduceTasks(0); @@ -551,6 +590,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { job.setOutputFormatClass(NullOutputFormat.class); job.getConfiguration().setBoolean("mapreduce.map.speculative", false); + String multipleUnevenColumnFamiliesStr = System.getProperty(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY); + if (multipleUnevenColumnFamiliesStr != null) { + job.getConfiguration().setBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, + Boolean.parseBoolean(multipleUnevenColumnFamiliesStr)); + } TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class); TableMapReduceUtil.initCredentials(job); @@ -704,9 +748,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { return result; } - private static SortedSet readFileToSearch(final Configuration conf, - final FileSystem fs, final LocatedFileStatus keyFileStatus) - throws IOException, InterruptedException { + private static SortedSet readFileToSearch(final Configuration conf, + final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException, + InterruptedException { SortedSet result = new TreeSet(Bytes.BYTES_COMPARATOR); // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is // what is missing. @@ -719,13 +763,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { while (rr.nextKeyValue()) { rr.getCurrentKey(); BytesWritable bw = rr.getCurrentValue(); - switch (Verify.VerifyReducer.whichType(bw.getBytes())) { - case UNDEFINED: - byte [] key = new byte [rr.getCurrentKey().getLength()]; - System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, - rr.getCurrentKey().getLength()); + if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) { + byte[] key = new byte[rr.getCurrentKey().getLength()]; + System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey() + .getLength()); result.add(key); - break; } } } @@ -740,7 +782,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { static class Verify extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(Verify.class); - protected static final BytesWritable DEF = new BytesWritable(NO_KEY); + protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 }); + protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 }); protected Job job; @@ -748,12 +791,29 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { private BytesWritable row = new BytesWritable(); private BytesWritable ref = new BytesWritable(); + private boolean multipleUnevenColumnFamilies; + + @Override + protected void setup( + Mapper.Context context) + throws IOException, InterruptedException { + this.multipleUnevenColumnFamilies = + context.getConfiguration().getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, + false); + } + @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException ,InterruptedException { byte[] rowKey = key.get(); row.set(rowKey, 0, rowKey.length); - context.write(row, DEF); + if (multipleUnevenColumnFamilies + && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn( + TINY_FAMILY_NAME, TINY_FAMILY_NAME))) { + context.write(row, DEF_LOST_FAMILIES); + } else { + context.write(row, DEF); + } byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); if (prev != null && prev.length > 0) { ref.set(prev, 0, prev.length); @@ -769,7 +829,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { * problems found from the reducer. */ public static enum Counts { - UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES, + LOST_FAMILIES } /** @@ -777,11 +838,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag * saying what sort of emission it is. Flag is the Count enum ordinal as a short. */ - public static class VerifyReducer - extends Reducer { + public static class VerifyReducer extends + Reducer { private ArrayList refs = new ArrayList(); - private final BytesWritable UNREF = - new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte [] {})); + private final BytesWritable UNREF = new BytesWritable(addPrefixFlag( + Counts.UNREFERENCED.ordinal(), new byte[] {})); + private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag( + Counts.LOST_FAMILIES.ordinal(), new byte[] {})); private AtomicInteger rows = new AtomicInteger(0); private Connection connection; @@ -794,9 +857,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } @Override - protected void cleanup(Reducer.Context context) - throws IOException, InterruptedException { - if (this.connection != null) this.connection.close(); + protected void cleanup( + Reducer.Context context) + throws IOException, InterruptedException { + if (this.connection != null) { + this.connection.close(); + } super.cleanup(context); } @@ -806,12 +872,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { * @return Return new byte array that has ordinal as prefix on front taking up * Bytes.SIZEOF_SHORT bytes followed by r */ - public static byte [] addPrefixFlag(final int ordinal, final byte [] r) { - byte [] prefix = Bytes.toBytes((short)ordinal); + public static byte[] addPrefixFlag(final int ordinal, final byte [] r) { + byte[] prefix = Bytes.toBytes((short)ordinal); if (prefix.length != Bytes.SIZEOF_SHORT) { throw new RuntimeException("Unexpected size: " + prefix.length); } - byte [] result = new byte [prefix.length + r.length]; + byte[] result = new byte[prefix.length + r.length]; System.arraycopy(prefix, 0, result, 0, prefix.length); System.arraycopy(r, 0, result, prefix.length, r.length); return result; @@ -831,21 +897,24 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { * @param bw * @return Row bytes minus the type flag. */ - public static byte [] getRowOnly(BytesWritable bw) { - byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; + public static byte[] getRowOnly(BytesWritable bw) { + byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); return bytes; } @Override public void reduce(BytesWritable key, Iterable values, Context context) - throws IOException, InterruptedException { - + throws IOException, InterruptedException { int defCount = 0; + boolean lostFamilies = false; refs.clear(); for (BytesWritable type : values) { if (type.getLength() == DEF.getLength()) { defCount++; + if (type.getBytes()[0] == 1) { + lostFamilies = true; + } } else { byte[] bytes = new byte[type.getLength()]; System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); @@ -861,13 +930,18 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" + (refsSb != null? refsSb.toString(): "")); } + if (lostFamilies) { + LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); + context.getCounter(Counts.LOST_FAMILIES).increment(1); + context.write(key, LOSTFAM); + } if (defCount == 0 && refs.size() > 0) { // This is bad, found a node that is referenced but not defined. It must have been // lost, emit some info about this node for debugging purposes. // Write out a line per reference. If more than one, flag it.; for (int i = 0; i < refs.size(); i++) { - byte [] bs = refs.get(i); + byte[] bs = refs.get(i); int ordinal; if (i <= 0) { ordinal = Counts.UNDEFINED.ordinal(); @@ -963,16 +1037,16 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { @Override public int run(String[] args) throws Exception { - if (args.length != 2) { - System.out.println("Usage : " + Verify.class.getSimpleName() + " "); + System.out.println("Usage : " + Verify.class.getSimpleName() + + " "); return 0; } String outputDir = args[0]; int numReducers = Integer.parseInt(args[1]); - return run(outputDir, numReducers); + return run(outputDir, numReducers); } public int run(String outputDir, int numReducers) throws Exception { @@ -982,7 +1056,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { public int run(Path outputDir, int numReducers) throws Exception { LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers); - job = new Job(getConf()); + job = Job.getInstance(getConf()); job.setJobName("Link Verifier"); job.setNumReduceTasks(numReducers); @@ -994,6 +1068,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { scan.addColumn(FAMILY_NAME, COLUMN_PREV); scan.setCaching(10000); scan.setCacheBlocks(false); + if (isMultiUnevenColumnFamilies()) { + scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME); + scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME); + job.getConfiguration().setBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true); + } TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); @@ -1012,7 +1091,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { return success ? 0 : 1; } - @SuppressWarnings("deprecation") public boolean verify(long expectedReferenced) throws Exception { if (job == null) { throw new IllegalStateException("You should call run() first"); @@ -1024,6 +1102,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); Counter undefined = counters.findCounter(Counts.UNDEFINED); Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES); + Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES); boolean success = true; //assert @@ -1045,6 +1124,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { success = false; } + if (lostfamilies.getValue() > 0) { + LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue()); + success = false; + } + if (!success) { handleFailure(counters); } @@ -1358,17 +1442,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { Path p = new Path(args[0]); Configuration conf = getConf(); TableName tableName = getTableName(conf); + try (FileSystem fs = HFileSystem.get(conf); + Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin()) { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } - FileSystem fs = HFileSystem.get(conf); - Admin admin = new HBaseAdmin(conf); - - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - - if (fs.exists(p)) { - fs.delete(p, true); + if (fs.exists(p)) { + fs.delete(p, true); + } } return 0; @@ -1420,12 +1504,22 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } + private static boolean isMultiUnevenColumnFamilies() { + return Boolean.TRUE.toString().equalsIgnoreCase( + System.getProperty(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY)); + } + @Test public void testContinuousIngest() throws IOException, Exception { //Loop - int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new Loop(), - new String[] {"1", "1", "2000000", - util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1"}); + Configuration conf = getTestingUtil(getConf()).getConfiguration(); + if (isMultiUnevenColumnFamilies()) { + // make sure per CF flush is on + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + } + int ret = + ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", + util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" }); org.junit.Assert.assertEquals(0, ret); } @@ -1468,7 +1562,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { @Override public int runTestFromCommandLine() throws Exception { - Tool tool = null; if (toRun.equalsIgnoreCase("Generator")) { tool = new Generator(); @@ -1504,7 +1597,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { @Override protected Set getColumnFamilies() { - return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); + if (isMultiUnevenColumnFamilies()) { + return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME), + Bytes.toString(TINY_FAMILY_NAME)); + } else { + return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); + } } private static void setJobConf(Job job, int numMappers, long numNodes,