diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index 4a3a64a821b..6e7cd33e615 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -20,15 +20,13 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterManager.ServiceType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; @@ -42,8 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.collect.Sets; - /** * Manages the interactions with an already deployed distributed cluster (as opposed to * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests. diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 29bb2bbc4fa..9864031c871 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -21,12 +21,16 @@ package org.apache.hadoop.hbase.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.FileNotFoundException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -40,10 +44,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; @@ -57,9 +65,9 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -70,13 +78,15 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; @@ -89,10 +99,15 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; @@ -382,6 +397,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { current[i] = new byte[key.getLength()]; System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); if (++i == current.length) { + LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" + + Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) + + ", i=" + i); persist(output, count, prev, current, id); i = 0; @@ -473,8 +491,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { "pre-splitting table into " + totalNumberOfRegions + " regions " + "(default regions per server: " + regionsPerServer + ")"); - byte[][] splits = new RegionSplitter.UniformSplit().split( - totalNumberOfRegions); + byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); admin.createTable(htd, splits); } @@ -563,6 +580,159 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } + /** + * Tool to search missing rows in WALs and hfiles. + * Pass in file or dir of keys to search for. Key file must have been written by Verify step + * (we depend on the format it writes out. We'll read them in and then search in hbase + * WALs and oldWALs dirs (Some of this is TODO). + */ + static class Search extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(Search.class); + protected Job job; + + private static void printUsage(final String error) { + if (error != null && error.length() > 0) System.out.println("ERROR: " + error); + System.err.println("Usage: search []"); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 1 || args.length > 2) { + printUsage(null); + return 1; + } + Path inputDir = new Path(args[0]); + int numMappers = 1; + if (args.length > 1) { + numMappers = Integer.parseInt(args[1]); + } + return run(inputDir, numMappers); + } + + /** + * WALPlayer override that searches for keys loaded in the setup. + */ + public static class WALSearcher extends WALPlayer { + public WALSearcher(Configuration conf) { + super(conf); + } + + /** + * The actual searcher mapper. + */ + public static class WALMapperSearcher extends WALMapper { + private SortedSet keysToFind; + + @Override + public void setup(Mapper.Context context) + throws IOException { + super.setup(context); + try { + this.keysToFind = readKeysToSearch(context.getConfiguration()); + LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + } + + @Override + protected boolean filter(Context context, Cell cell) { + // TODO: Can I do a better compare than this copying out key? + byte [] row = new byte [cell.getRowLength()]; + System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); + boolean b = this.keysToFind.contains(row); + if (b) { + String keyStr = Bytes.toStringBinary(row); + LOG.info("Found cell=" + cell); + context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); + } + return b; + } + } + + // Put in place the above WALMapperSearcher. + @Override + public Job createSubmittableJob(String[] args) throws IOException { + Job job = super.createSubmittableJob(args); + // Call my class instead. + job.setJarByClass(WALMapperSearcher.class); + job.setMapperClass(WALMapperSearcher.class); + job.setOutputFormatClass(NullOutputFormat.class); + return job; + } + } + + static final String FOUND_GROUP_KEY = "Found"; + static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; + + public int run(Path inputDir, int numMappers) throws Exception { + getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); + SortedSet keys = readKeysToSearch(getConf()); + if (keys.isEmpty()) throw new RuntimeException("No keys to find"); + LOG.info("Count of keys to find: " + keys.size()); + for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); + Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR)); + // Now read all WALs. In two dirs. Presumes certain layout. + Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers + + " against " + getConf().get(HConstants.HBASE_DIR)); + int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""}); + if (ret != 0) return ret; + return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""}); + } + + static SortedSet readKeysToSearch(final Configuration conf) + throws IOException, InterruptedException { + Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); + FileSystem fs = FileSystem.get(conf); + SortedSet result = new TreeSet(Bytes.BYTES_COMPARATOR); + if (!fs.exists(keysInputDir)) { + throw new FileNotFoundException(keysInputDir.toString()); + } + if (!fs.isDirectory(keysInputDir)) { + throw new UnsupportedOperationException("TODO"); + } else { + RemoteIterator iterator = fs.listFiles(keysInputDir, false); + while(iterator.hasNext()) { + LocatedFileStatus keyFileStatus = iterator.next(); + // Skip "_SUCCESS" file. + if (keyFileStatus.getPath().getName().startsWith("_")) continue; + result.addAll(readFileToSearch(conf, fs, keyFileStatus)); + } + } + return result; + } + + private static SortedSet readFileToSearch(final Configuration conf, + final FileSystem fs, final LocatedFileStatus keyFileStatus) + throws IOException, InterruptedException { + SortedSet result = new TreeSet(Bytes.BYTES_COMPARATOR); + // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is + // what is missing. + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = + new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { + InputSplit is = + new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {}); + rr.initialize(is, context); + while (rr.nextKeyValue()) { + rr.getCurrentKey(); + BytesWritable bw = rr.getCurrentValue(); + switch (Verify.VerifyReducer.whichType(bw.getBytes())) { + case UNDEFINED: + byte [] key = new byte [rr.getCurrentKey().getLength()]; + System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, + rr.getCurrentKey().getLength()); + result.add(key); + break; + } + } + } + return result; + } + } + /** * A Map Reduce job that verifies that the linked lists generated by * {@link Generator} do not have any holes. @@ -594,21 +764,84 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } + /** + * Don't change the order of these enums. Their ordinals are used as type flag when we emit + * problems found from the reducer. + */ public static enum Counts { - UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES } - public static class VerifyReducer extends Reducer { + /** + * Per reducer, we output problem rows as byte arrasy so can be used as input for + * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag + * saying what sort of emission it is. Flag is the Count enum ordinal as a short. + */ + public static class VerifyReducer + extends Reducer { private ArrayList refs = new ArrayList(); + private final BytesWritable UNREF = + new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte [] {})); private AtomicInteger rows = new AtomicInteger(0); + private Connection connection; + + @Override + protected void setup(Reducer.Context context) + throws IOException, InterruptedException { + super.setup(context); + this.connection = ConnectionFactory.createConnection(context.getConfiguration()); + } + + @Override + protected void cleanup(Reducer.Context context) + throws IOException, InterruptedException { + if (this.connection != null) this.connection.close(); + super.cleanup(context); + } + + /** + * @param ordinal + * @param r + * @return Return new byte array that has ordinal as prefix on front taking up + * Bytes.SIZEOF_SHORT bytes followed by r + */ + public static byte [] addPrefixFlag(final int ordinal, final byte [] r) { + byte [] prefix = Bytes.toBytes((short)ordinal); + if (prefix.length != Bytes.SIZEOF_SHORT) { + throw new RuntimeException("Unexpected size: " + prefix.length); + } + byte [] result = new byte [prefix.length + r.length]; + System.arraycopy(prefix, 0, result, 0, prefix.length); + System.arraycopy(r, 0, result, prefix.length, r.length); + return result; + } + + /** + * @param bs + * @return Type from the Counts enum of this row. Reads prefix added by + * {@link #addPrefixFlag(int, byte[])} + */ + public static Counts whichType(final byte [] bs) { + int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); + return Counts.values()[ordinal]; + } + + /** + * @param bw + * @return Row bytes minus the type flag. + */ + public static byte [] getRowOnly(BytesWritable bw) { + byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; + System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); + return bytes; + } @Override public void reduce(BytesWritable key, Iterable values, Context context) - throws IOException, InterruptedException { + throws IOException, InterruptedException { int defCount = 0; - refs.clear(); for (BytesWritable type : values) { if (type.getLength() == DEF.getLength()) { @@ -621,48 +854,110 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } // TODO check for more than one def, should not happen - StringBuilder refsSb = null; - String keyString = null; + String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); if (defCount == 0 || refs.size() != 1) { - refsSb = new StringBuilder(); - String comma = ""; - for (byte[] ref : refs) { - refsSb.append(comma); - comma = ","; - refsSb.append(Bytes.toStringBinary(ref)); - } - keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); - - LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString()); + refsSb = dumpExtraInfoOnRefs(key, context, refs); + LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" + + (refsSb != null? refsSb.toString(): "")); } if (defCount == 0 && refs.size() > 0) { - // this is bad, found a node that is referenced but not defined. It must have been + // This is bad, found a node that is referenced but not defined. It must have been // lost, emit some info about this node for debugging purposes. - context.write(new Text(keyString), new Text(refsSb.toString())); - context.getCounter(Counts.UNDEFINED).increment(1); + // Write out a line per reference. If more than one, flag it.; + for (int i = 0; i < refs.size(); i++) { + byte [] bs = refs.get(i); + int ordinal; + if (i <= 0) { + ordinal = Counts.UNDEFINED.ordinal(); + context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); + context.getCounter(Counts.UNDEFINED).increment(1); + } else { + ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal(); + context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); + } + } if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { + // Print out missing row; doing get on reference gives info on when the referencer + // was added which can help a little debugging. This info is only available in mapper + // output -- the 'Linked List error Key...' log message above. What we emit here is + // useless for debugging. context.getCounter("undef", keyString).increment(1); } } else if (defCount > 0 && refs.size() == 0) { // node is defined but not referenced - context.write(new Text(keyString), new Text("none")); + context.write(key, UNREF); context.getCounter(Counts.UNREFERENCED).increment(1); if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { context.getCounter("unref", keyString).increment(1); } } else { if (refs.size() > 1) { - if (refsSb != null) { - context.write(new Text(keyString), new Text(refsSb.toString())); + // Skip first reference. + for (int i = 1; i < refs.size(); i++) { + context.write(key, + new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i)))); } context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1); } // node is defined and referenced context.getCounter(Counts.REFERENCED).increment(1); } + } + /** + * Dump out extra info around references if there are any. Helps debugging. + * @return StringBuilder filled with references if any. + * @throws IOException + */ + private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, + final List refs) + throws IOException { + StringBuilder refsSb = null; + if (refs.isEmpty()) return refsSb; + refsSb = new StringBuilder(); + String comma = ""; + // If a row is a reference but has no define, print the content of the row that has + // this row as a 'prev'; it will help debug. The missing row was written just before + // the row we are dumping out here. + TableName tn = getTableName(context.getConfiguration()); + try (Table t = this.connection.getTable(tn)) { + for (byte [] ref : refs) { + Result r = t.get(new Get(ref)); + List cells = r.listCells(); + String ts = (cells != null && !cells.isEmpty())? + new java.util.Date(cells.get(0).getTimestamp()).toString(): ""; + byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); + String jobStr = (b != null && b.length > 0)? Bytes.toString(b): ""; + b = r.getValue(FAMILY_NAME, COLUMN_COUNT); + long count = (b != null && b.length > 0)? Bytes.toLong(b): -1; + b = r.getValue(FAMILY_NAME, COLUMN_PREV); + String refRegionLocation = ""; + String keyRegionLocation = ""; + if (b != null && b.length > 0) { + try (RegionLocator rl = this.connection.getRegionLocator(tn)) { + HRegionLocation hrl = rl.getRegionLocation(b); + if (hrl != null) refRegionLocation = hrl.toString(); + // Key here probably has trailing zeros on it. + hrl = rl.getRegionLocation(key.getBytes()); + if (hrl != null) keyRegionLocation = hrl.toString(); + } + } + LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) + + ", refPrevEqualsKey=" + + (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) + + ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) + + ", ref row date=" + ts + ", jobStr=" + jobStr + + ", ref row count=" + count + + ", ref row regionLocation=" + refRegionLocation + + ", key row regionLocation=" + keyRegionLocation); + refsSb.append(comma); + comma = ","; + refsSb.append(Bytes.toStringBinary(ref)); + } + } + return refsSb; } } @@ -707,7 +1002,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { job.getConfiguration().setBoolean("mapreduce.map.speculative", false); job.setReducerClass(VerifyReducer.class); - job.setOutputFormatClass(TextOutputFormat.class); + job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); TextOutputFormat.setOutputPath(job, outputDir); boolean success = job.waitForCompletion(true); @@ -756,23 +1053,26 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected void handleFailure(Counters counters) throws IOException { Configuration conf = job.getConfiguration(); - HConnection conn = HConnectionManager.getConnection(conf); TableName tableName = getTableName(conf); - CounterGroup g = counters.getGroup("undef"); - Iterator it = g.iterator(); - while (it.hasNext()) { - String keyString = it.next().getName(); - byte[] key = Bytes.toBytes(keyString); - HRegionLocation loc = conn.relocateRegion(tableName, key); - LOG.error("undefined row " + keyString + ", " + loc); - } - g = counters.getGroup("unref"); - it = g.iterator(); - while (it.hasNext()) { - String keyString = it.next().getName(); - byte[] key = Bytes.toBytes(keyString); - HRegionLocation loc = conn.relocateRegion(tableName, key); - LOG.error("unreferred row " + keyString + ", " + loc); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + try (RegionLocator rl = conn.getRegionLocator(tableName)) { + CounterGroup g = counters.getGroup("undef"); + Iterator it = g.iterator(); + while (it.hasNext()) { + String keyString = it.next().getName(); + byte[] key = Bytes.toBytes(keyString); + HRegionLocation loc = rl.getRegionLocation(key, true); + LOG.error("undefined row " + keyString + ", " + loc); + } + g = counters.getGroup("unref"); + it = g.iterator(); + while (it.hasNext()) { + String keyString = it.next().getName(); + byte[] key = Bytes.toBytes(keyString); + HRegionLocation loc = rl.getRegionLocation(key, true); + LOG.error("unreferred row " + keyString + ", " + loc); + } + } } } } @@ -944,7 +1244,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } /** - * A stand alone program that follows a linked list created by {@link Generator} and prints timing info. + * A stand alone program that follows a linked list created by {@link Generator} and prints + * timing info. */ private static class Walker extends Configured implements Tool { @Override @@ -1048,7 +1349,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } private static class Clean extends Configured implements Tool { - @Override public int run(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: Clean "); @@ -1136,16 +1436,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { private void printCommands() { System.err.println("Commands:"); - System.err.println(" Generator Map only job that generates data."); - System.err.println(" Verify A map reduce job that looks for holes. Look at the counts "); + System.err.println(" generator Map only job that generates data."); + System.err.println(" verify A map reduce job that looks for holes. Look at the counts "); System.err.println(" after running. See REFERENCED and UNREFERENCED are ok. Any "); System.err.println(" UNDEFINED counts are bad. Do not run with the Generator."); - System.err.println(" Walker " + - "Standalong program that starts following a linked list & emits timing info."); - System.err.println(" Print Standalone program that prints nodes in the linked list."); - System.err.println(" Delete Standalone program that deletes a·single node."); - System.err.println(" Loop Program to Loop through Generator and Verify steps"); - System.err.println(" Clean Program to clean all left over detritus."); + System.err.println(" walker " + + "Standalone program that starts following a linked list & emits timing info."); + System.err.println(" print Standalone program that prints nodes in the linked list."); + System.err.println(" delete Standalone program that deletes a·single node."); + System.err.println(" loop Program to Loop through Generator and Verify steps"); + System.err.println(" clean Program to clean all left over detritus."); + System.err.println(" search Search for missing keys."); System.err.flush(); } @@ -1158,6 +1459,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { printUsage(this.getClass().getSimpleName() + " COMMAND []", "General options:", ""); printCommands(); + // Have to throw an exception here to stop the processing. Looks ugly but gets message across. throw new RuntimeException("Incorrect Number of args."); } toRun = args[0]; @@ -1168,7 +1470,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { public int runTestFromCommandLine() throws Exception { Tool tool = null; - if (toRun.equals("Generator")) { + if (toRun.equalsIgnoreCase("Generator")) { tool = new Generator(); } else if (toRun.equalsIgnoreCase("Verify")) { tool = new Verify(); @@ -1184,6 +1486,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { tool = new Delete(); } else if (toRun.equalsIgnoreCase("Clean")) { tool = new Clean(); + } else if (toRun.equalsIgnoreCase("Search")) { + tool = new Search(); } else { usage(); throw new RuntimeException("Unknown arg"); @@ -1227,4 +1531,4 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args); System.exit(ret); } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index a04cb88ae13..d682ccce63c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -221,9 +221,9 @@ public class HFileArchiver { } // otherwise we attempt to archive the store files - if (LOG.isTraceEnabled()) LOG.trace("Archiving compacted store files."); + if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files."); - // wrap the storefile into a File + // Wrap the storefile into a File StoreToFile getStorePath = new StoreToFile(fs); Collection storeFiles = Collections2.transform(compactedFiles, getStorePath); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 67d9b0d9567..c20e375a8a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -128,14 +128,12 @@ public class WALPlayer extends Configured implements Tool { * A mapper that writes out {@link Mutation} to be directly applied to * a running HBase instance. */ - static class WALMapper + protected static class WALMapper extends Mapper { - private Map tables = - new TreeMap(); + private Map tables = new TreeMap(); @Override - public void map(WALKey key, WALEdit value, - Context context) + public void map(WALKey key, WALEdit value, Context context) throws IOException { try { if (tables.isEmpty() || tables.containsKey(key.getTablename())) { @@ -150,26 +148,28 @@ public class WALPlayer extends Configured implements Tool { // filtering WAL meta entries if (WALEdit.isMetaEditFamily(cell.getFamily())) continue; - // A WALEdit may contain multiple operations (HBASE-3584) and/or - // multiple rows (HBASE-5229). - // Aggregate as much as possible into a single Put/Delete - // operation before writing to the context. - if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() - || !CellUtil.matchingRow(lastCell, cell)) { - // row or type changed, write out aggregate KVs. - if (put != null) context.write(tableOut, put); - if (del != null) context.write(tableOut, del); - - if (CellUtil.isDelete(cell)) { - del = new Delete(cell.getRow()); - } else { - put = new Put(cell.getRow()); + // Allow a subclass filter out this cell. + if (filter(context, cell)) { + // A WALEdit may contain multiple operations (HBASE-3584) and/or + // multiple rows (HBASE-5229). + // Aggregate as much as possible into a single Put/Delete + // operation before writing to the context. + if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() + || !CellUtil.matchingRow(lastCell, cell)) { + // row or type changed, write out aggregate KVs. + if (put != null) context.write(tableOut, put); + if (del != null) context.write(tableOut, del); + if (CellUtil.isDelete(cell)) { + del = new Delete(cell.getRow()); + } else { + put = new Put(cell.getRow()); + } + } + if (CellUtil.isDelete(cell)) { + del.addDeleteMarker(cell); + } else { + put.add(cell); } - } - if (CellUtil.isDelete(cell)) { - del.addDeleteMarker(cell); - } else { - put.add(cell); } lastCell = cell; } @@ -182,18 +182,30 @@ public class WALPlayer extends Configured implements Tool { } } + /** + * @param cell + * @return Return true if we are to emit this cell. + */ + protected boolean filter(Context context, final Cell cell) { + return true; + } + @Override public void setup(Context context) throws IOException { String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); - if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { + if (tablesToUse == null && tableMap == null) { + // Then user wants all tables. + } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { // this can only happen when WALMapper is used directly by a class other than WALPlayer throw new IOException("No tables or incorrect table mapping specified."); } int i = 0; - for (String table : tablesToUse) { - tables.put(TableName.valueOf(table), + if (tablesToUse != null) { + for (String table : tablesToUse) { + tables.put(TableName.valueOf(table), TableName.valueOf(tableMap[i++])); + } } } } @@ -337,4 +349,4 @@ public class WALPlayer extends Configured implements Tool { Job job = createSubmittableJob(otherArgs); return job.waitForCompletion(true) ? 0 : 1; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 8a7a362808b..9ac208498be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -167,7 +167,7 @@ public class SplitLogManager { /** * Get a list of paths that need to be split given a set of server-specific directories and - * optinally a filter. + * optionally a filter. * * See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory * layout. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 05a5a9edbcb..5a93a6d0c4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -251,8 +251,8 @@ public abstract class CleanerChore extends Schedu int deletedFileCount = 0; for (FileStatus file : filesToDelete) { Path filePath = file.getPath(); - if (LOG.isTraceEnabled()) { - LOG.trace("Removing: " + filePath + " from archive"); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing: " + filePath + " from archive"); } try { boolean success = this.fs.delete(filePath, false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 26f8943d731..c83841a7012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3666,11 +3666,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // internalFlushcache(null, seqid, stores.values(), status); } // Now delete the content of recovered edits. We're done w/ them. - for (Path file: files) { - if (!fs.delete(file, false)) { - LOG.error("Failed delete of " + file); - } else { - LOG.debug("Deleted recovered.edits file=" + file); + if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { + // For debugging data loss issues! + // If this flag is set, make use of the hfile archiving by making recovered.edits a fake + // column family. Have to fake out file type too by casting our recovered.edits as storefiles + String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); + Set fakeStoreFiles = new HashSet(files.size()); + for (Path file: files) { + fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, + null, null)); + } + getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); + } else { + for (Path file: files) { + if (!fs.delete(file, false)) { + LOG.error("Failed delete of " + file); + } else { + LOG.debug("Deleted recovered.edits file=" + file); + } } } return seqid; @@ -3710,8 +3723,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // try { // How many edits seen before we check elapsed time - int interval = this.conf.getInt("hbase.hstore.report.interval.edits", - 2000); + int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); // How often to send a progress report (default 1/2 master timeout) int period = this.conf.getInt("hbase.hstore.report.period", 300000); long lastReport = EnvironmentEdgeManager.currentTime(); @@ -3770,21 +3782,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // continue; } } + // Check this edit is for this region. + if (!Bytes.equals(key.getEncodedRegionName(), + this.getRegionInfo().getEncodedNameAsBytes())) { + skippedEdits++; + continue; + } boolean flush = false; for (Cell cell: val.getCells()) { // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) || - !Bytes.equals(key.getEncodedRegionName(), - this.getRegionInfo().getEncodedNameAsBytes())) { + if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { //this is a special edit, we should handle it CompactionDescriptor compaction = WALEdit.getCompaction(cell); if (compaction != null) { //replay the compaction completeCompactionMarker(compaction); } - skippedEdits++; continue; } @@ -3810,10 +3825,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Once we are over the limit, restoreEdit will keep returning true to // flush -- but don't flush until we've played all the kvs that make up // the WALEdit. - if (!flush) { - flush = restoreEdit(store, cell); - } - + flush |= restoreEdit(store, cell); editsCount++; } if (flush) { @@ -5040,6 +5052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @return qualified path of region directory */ @Deprecated + @VisibleForTesting public static Path getRegionDir(final Path rootdir, final HRegionInfo info) { return new Path( FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 19b4719b92d..0211a172fa6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1018,6 +1018,7 @@ public class FSHLog implements WAL { i.preLogArchive(p, newPath); } } + LOG.info("Archiving " + p + " to " + newPath); if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { throw new IOException("Unable to rename " + p + " to " + newPath); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index f5b026934ff..0d0912eccd8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1204,7 +1204,7 @@ public abstract class FSUtils { private List blacklist; /** - * Create a filter on the give filesystem with the specified blacklist + * Create a filter on the givem filesystem with the specified blacklist * @param fs filesystem to filter * @param directoryNameBlackList list of the names of the directories to filter. If * null, all directories are returned diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java index 104faad3013..720cedcee3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java @@ -75,7 +75,7 @@ public class WALPrettyPrinter { // enable in order to output a single list of transactions from several files private boolean persistentOutput; private boolean firstTxn; - // useful for programatic capture of JSON output + // useful for programmatic capture of JSON output private PrintStream out; // for JSON encoding private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -267,8 +267,9 @@ public class WALPrettyPrinter { Map op = new HashMap(toStringMap(cell)); if (outputValues) op.put("value", Bytes.toStringBinary(cell.getValue())); // check row output filter - if (row == null || ((String) op.get("row")).equals(row)) + if (row == null || ((String) op.get("row")).equals(row)) { actions.add(op); + } } if (actions.size() == 0) continue; @@ -283,22 +284,16 @@ public class WALPrettyPrinter { out.print(MAPPER.writeValueAsString(txn)); } else { // Pretty output, complete with indentation by atomic action - out.println("Sequence " + txn.get("sequence") + " " - + "from region " + txn.get("region") + " " + "in table " - + txn.get("table") + " at write timestamp: " + new Date(writeTime)); + out.println("Sequence=" + txn.get("sequence") + " " + + ", region=" + txn.get("region") + " at write timestamp=" + new Date(writeTime)); for (int i = 0; i < actions.size(); i++) { Map op = actions.get(i); - out.println(" Action:"); - out.println(" row: " + op.get("row")); - out.println(" column: " + op.get("family") + ":" - + op.get("qualifier")); - out.println(" timestamp: " - + (new Date((Long) op.get("timestamp")))); - if(op.get("tag") != null) { + out.println("row=" + op.get("row") + + ", column=" + op.get("family") + ":" + op.get("qualifier")); + if (op.get("tag") != null) { out.println(" tag: " + op.get("tag")); } - if (outputValues) - out.println(" value: " + op.get("value")); + if (outputValues) out.println(" value: " + op.get("value")); } } } @@ -347,8 +342,6 @@ public class WALPrettyPrinter { * Command line arguments * @throws IOException * Thrown upon file system errors etc. - * @throws ParseException - * Thrown if command-line parsing fails. */ public static void run(String[] args) throws IOException { // create options @@ -364,7 +357,7 @@ public class WALPrettyPrinter { WALPrettyPrinter printer = new WALPrettyPrinter(); CommandLineParser parser = new PosixParser(); - List files = null; + List files = null; try { CommandLine cmd = parser.parse(options, args); files = cmd.getArgList(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 2ddc9d1344b..c187af1fb84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -49,13 +49,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; @@ -73,9 +68,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagRewriteCell; -import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -83,9 +76,9 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; -import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.master.SplitLogManager; @@ -105,9 +98,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.LastSequenceId; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; +// imports for things that haven't moved from regionserver.wal yet. +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; @@ -115,17 +111,17 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; -// imports for things that haven't moved from regionserver.wal yet. -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -269,8 +265,7 @@ public class WALSplitter { * log splitting implementation, splits one log file. * @param logfile should be an actual log file. */ - boolean splitLogFile(FileStatus logfile, - CancelableProgressable reporter) throws IOException { + boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { Preconditions.checkState(status == null); Preconditions.checkArgument(logfile.isFile(), "passed in file status is for something other than a regular file."); @@ -399,8 +394,9 @@ public class WALSplitter { } finally { String msg = "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() - + " regions; log file=" + logPath + " is corrupted = " + isCorrupted - + " progress failed = " + progress_failed; + + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath + + ", length=" + logfile.getLen() + // See if length got updated post lease recovery + ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; LOG.info(msg); status.markComplete(msg); } @@ -714,8 +710,8 @@ public class WALSplitter { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } if (LOG.isDebugEnabled()) { - LOG.debug("Written region seqId to file:" + newSeqIdFile + " ,newSeqId=" + newSeqId - + " ,maxSeqId=" + maxSeqId); + LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId + + ", maxSeqId=" + maxSeqId); } } catch (FileAlreadyExistsException ignored) { // latest hdfs throws this exception. it's all right if newSeqIdFile already exists diff --git a/hbase-server/src/test/data/0000000000000016310 b/hbase-server/src/test/data/0000000000000016310 new file mode 100644 index 00000000000..8e58c98d4e9 Binary files /dev/null and b/hbase-server/src/test/data/0000000000000016310 differ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index f9811856cf6..8613276c21c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2526,14 +2526,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Stops the previously started MiniMRCluster. */ public void shutdownMiniMapReduceCluster() { - LOG.info("Stopping mini mapreduce cluster..."); if (mrCluster != null) { + LOG.info("Stopping mini mapreduce cluster..."); mrCluster.shutdown(); mrCluster = null; + LOG.info("Mini mapreduce cluster stopped"); } // Restore configuration to point to local jobtracker conf.set("mapreduce.jobtracker.address", "local"); - LOG.info("Mini mapreduce cluster stopped"); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java new file mode 100644 index 00000000000..3d651ef0d00 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mortbay.log.Log; + +/** + * Tests around replay of recovered.edits content. + */ +@Category({MediumTests.class}) +public class TestRecoveredEdits { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + @Rule public TestName testName = new TestName(); + + /** + * HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask. + * Create a region. Close it. Then copy into place a file to replay, one that is bigger than + * configured flush size so we bring on lots of flushes. Then reopen and confirm all edits + * made it in. + * @throws IOException + */ + @Test (timeout=30000) + public void testReplayWorksThoughLotsOfFlushing() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + // Set it so we flush every 1M or so. Thats a lot. + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); + // The file of recovered edits has a column family of 'meta'. Also has an encoded regionname + // of 4823016d8fca70b25503ee07f4c6d79f which needs to match on replay. + final String encodedRegionName = "4823016d8fca70b25503ee07f4c6d79f"; + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(testName.getMethodName())); + final String columnFamily = "meta"; + byte [][] columnFamilyAsByteArray = new byte [][] {Bytes.toBytes(columnFamily)}; + htd.addFamily(new HColumnDescriptor(columnFamily)); + HRegionInfo hri = new HRegionInfo(htd.getTableName()) { + @Override + public synchronized String getEncodedName() { + return encodedRegionName; + } + + // Cache the name because lots of lookups. + private byte [] encodedRegionNameAsBytes = null; + @Override + public synchronized byte[] getEncodedNameAsBytes() { + if (encodedRegionNameAsBytes == null) { + this.encodedRegionNameAsBytes = Bytes.toBytes(getEncodedName()); + } + return this.encodedRegionNameAsBytes; + } + }; + Path hbaseRootDir = TEST_UTIL.getDataTestDir(); + HRegion region = HRegion.createHRegion(hri, hbaseRootDir, conf, htd, null); + assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); + List storeFiles = region.getStoreFileList(columnFamilyAsByteArray); + // There should be no store files. + assertTrue(storeFiles.isEmpty()); + region.close(); + Path regionDir = region.getRegionDir(hbaseRootDir, hri); + Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); + // This is a little fragile getting this path to a file of 10M of edits. + Path recoveredEditsFile = new Path(new Path( + System.getProperty("project.build.testSourceDirectory", "src" + Path.SEPARATOR + "test"), + "data"), "0000000000000016310"); + // Copy this file under the region's recovered.edits dir so it is replayed on reopen. + FileSystem fs = FileSystem.get(conf); + Path destination = new Path(recoveredEditsDir, recoveredEditsFile.getName()); + fs.copyToLocalFile(recoveredEditsFile, destination); + assertTrue(fs.exists(destination)); + // Now the file 0000000000000016310 is under recovered.edits, reopen the region to replay. + region = HRegion.openHRegion(region, null); + assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); + storeFiles = region.getStoreFileList(columnFamilyAsByteArray); + // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if + // we flush at 1MB, that there are at least 3 flushed files that are there because of the + // replay of edits. + assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10); + // Now verify all edits made it into the region. + int count = verifyAllEditsMadeItIn(fs, conf, recoveredEditsFile, region); + Log.info("Checked " + count + " edits made it in"); + } + + /** + * @param fs + * @param conf + * @param edits + * @param region + * @return Return how many edits seen. + * @throws IOException + */ + private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf, + final Path edits, final HRegion region) + throws IOException { + int count = 0; + // Based on HRegion#replayRecoveredEdits + WAL.Reader reader = null; + try { + reader = WALFactory.createReader(fs, edits, conf); + WAL.Entry entry; + while ((entry = reader.next()) != null) { + WALKey key = entry.getKey(); + WALEdit val = entry.getEdit(); + count++; + // Check this edit is for this region. + if (!Bytes.equals(key.getEncodedRegionName(), + region.getRegionInfo().getEncodedNameAsBytes())) { + continue; + } + Cell previous = null; + for (Cell cell: val.getCells()) { + if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue; + if (previous != null && CellComparator.compareRows(previous, cell) == 0) continue; + previous = cell; + Get g = new Get(CellUtil.cloneRow(cell)); + Result r = region.get(g); + boolean found = false; + for (CellScanner scanner = r.cellScanner(); scanner.advance();) { + Cell current = scanner.current(); + if (CellComparator.compare(cell, current, true) == 0) { + found = true; + break; + } + } + assertTrue("Failed to find " + cell, found); + } + } + } finally { + if (reader != null) reader.close(); + } + return count; + } +} \ No newline at end of file