HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask

This commit is contained in:
stack 2015-01-30 19:12:17 -08:00
parent b08802a3e8
commit 825871431e
14 changed files with 646 additions and 154 deletions

View File

@ -20,15 +20,13 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterManager.ServiceType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
@ -42,8 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.collect.Sets;
/**
* Manages the interactions with an already deployed distributed cluster (as opposed to
* a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.

View File

@ -21,12 +21,16 @@ package org.apache.hadoop.hbase.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@ -40,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.IntegrationTestBase;
@ -57,9 +65,9 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@ -70,13 +78,15 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
@ -89,10 +99,15 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
@ -382,6 +397,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
current[i] = new byte[key.getLength()];
System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
if (++i == current.length) {
LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" +
Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) +
", i=" + i);
persist(output, count, prev, current, id);
i = 0;
@ -473,8 +491,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
"pre-splitting table into " + totalNumberOfRegions + " regions " +
"(default regions per server: " + regionsPerServer + ")");
byte[][] splits = new RegionSplitter.UniformSplit().split(
totalNumberOfRegions);
byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
admin.createTable(htd, splits);
}
@ -563,6 +580,159 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
}
/**
* Tool to search missing rows in WALs and hfiles.
* Pass in file or dir of keys to search for. Key file must have been written by Verify step
* (we depend on the format it writes out. We'll read them in and then search in hbase
* WALs and oldWALs dirs (Some of this is TODO).
*/
static class Search extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(Search.class);
protected Job job;
private static void printUsage(final String error) {
if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 1 || args.length > 2) {
printUsage(null);
return 1;
}
Path inputDir = new Path(args[0]);
int numMappers = 1;
if (args.length > 1) {
numMappers = Integer.parseInt(args[1]);
}
return run(inputDir, numMappers);
}
/**
* WALPlayer override that searches for keys loaded in the setup.
*/
public static class WALSearcher extends WALPlayer {
public WALSearcher(Configuration conf) {
super(conf);
}
/**
* The actual searcher mapper.
*/
public static class WALMapperSearcher extends WALMapper {
private SortedSet<byte []> keysToFind;
@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
throws IOException {
super.setup(context);
try {
this.keysToFind = readKeysToSearch(context.getConfiguration());
LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
}
@Override
protected boolean filter(Context context, Cell cell) {
// TODO: Can I do a better compare than this copying out key?
byte [] row = new byte [cell.getRowLength()];
System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
boolean b = this.keysToFind.contains(row);
if (b) {
String keyStr = Bytes.toStringBinary(row);
LOG.info("Found cell=" + cell);
context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
}
return b;
}
}
// Put in place the above WALMapperSearcher.
@Override
public Job createSubmittableJob(String[] args) throws IOException {
Job job = super.createSubmittableJob(args);
// Call my class instead.
job.setJarByClass(WALMapperSearcher.class);
job.setMapperClass(WALMapperSearcher.class);
job.setOutputFormatClass(NullOutputFormat.class);
return job;
}
}
static final String FOUND_GROUP_KEY = "Found";
static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
public int run(Path inputDir, int numMappers) throws Exception {
getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
SortedSet<byte []> keys = readKeysToSearch(getConf());
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
if (ret != 0) return ret;
return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
}
static SortedSet<byte []> readKeysToSearch(final Configuration conf)
throws IOException, InterruptedException {
Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
FileSystem fs = FileSystem.get(conf);
SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
if (!fs.exists(keysInputDir)) {
throw new FileNotFoundException(keysInputDir.toString());
}
if (!fs.isDirectory(keysInputDir)) {
throw new UnsupportedOperationException("TODO");
} else {
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
while(iterator.hasNext()) {
LocatedFileStatus keyFileStatus = iterator.next();
// Skip "_SUCCESS" file.
if (keyFileStatus.getPath().getName().startsWith("_")) continue;
result.addAll(readFileToSearch(conf, fs, keyFileStatus));
}
}
return result;
}
private static SortedSet<byte []> readFileToSearch(final Configuration conf,
final FileSystem fs, final LocatedFileStatus keyFileStatus)
throws IOException, InterruptedException {
SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
// Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
// what is missing.
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
InputSplit is =
new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
rr.initialize(is, context);
while (rr.nextKeyValue()) {
rr.getCurrentKey();
BytesWritable bw = rr.getCurrentValue();
switch (Verify.VerifyReducer.whichType(bw.getBytes())) {
case UNDEFINED:
byte [] key = new byte [rr.getCurrentKey().getLength()];
System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
rr.getCurrentKey().getLength());
result.add(key);
break;
}
}
}
return result;
}
}
/**
* A Map Reduce job that verifies that the linked lists generated by
* {@link Generator} do not have any holes.
@ -594,21 +764,84 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
}
/**
* Don't change the order of these enums. Their ordinals are used as type flag when we emit
* problems found from the reducer.
*/
public static enum Counts {
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES
}
public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
/**
* Per reducer, we output problem rows as byte arrasy so can be used as input for
* subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
* saying what sort of emission it is. Flag is the Count enum ordinal as a short.
*/
public static class VerifyReducer
extends Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable> {
private ArrayList<byte[]> refs = new ArrayList<byte[]>();
private final BytesWritable UNREF =
new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte [] {}));
private AtomicInteger rows = new AtomicInteger(0);
private Connection connection;
@Override
protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
this.connection = ConnectionFactory.createConnection(context.getConfiguration());
}
@Override
protected void cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
throws IOException, InterruptedException {
if (this.connection != null) this.connection.close();
super.cleanup(context);
}
/**
* @param ordinal
* @param r
* @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
* Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
*/
public static byte [] addPrefixFlag(final int ordinal, final byte [] r) {
byte [] prefix = Bytes.toBytes((short)ordinal);
if (prefix.length != Bytes.SIZEOF_SHORT) {
throw new RuntimeException("Unexpected size: " + prefix.length);
}
byte [] result = new byte [prefix.length + r.length];
System.arraycopy(prefix, 0, result, 0, prefix.length);
System.arraycopy(r, 0, result, prefix.length, r.length);
return result;
}
/**
* @param bs
* @return Type from the Counts enum of this row. Reads prefix added by
* {@link #addPrefixFlag(int, byte[])}
*/
public static Counts whichType(final byte [] bs) {
int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
return Counts.values()[ordinal];
}
/**
* @param bw
* @return Row bytes minus the type flag.
*/
public static byte [] getRowOnly(BytesWritable bw) {
byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
return bytes;
}
@Override
public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException {
throws IOException, InterruptedException {
int defCount = 0;
refs.clear();
for (BytesWritable type : values) {
if (type.getLength() == DEF.getLength()) {
@ -621,48 +854,110 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
// TODO check for more than one def, should not happen
StringBuilder refsSb = null;
String keyString = null;
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
if (defCount == 0 || refs.size() != 1) {
refsSb = new StringBuilder();
String comma = "";
for (byte[] ref : refs) {
refsSb.append(comma);
comma = ",";
refsSb.append(Bytes.toStringBinary(ref));
}
keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString());
refsSb = dumpExtraInfoOnRefs(key, context, refs);
LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
(refsSb != null? refsSb.toString(): ""));
}
if (defCount == 0 && refs.size() > 0) {
// this is bad, found a node that is referenced but not defined. It must have been
// This is bad, found a node that is referenced but not defined. It must have been
// lost, emit some info about this node for debugging purposes.
context.write(new Text(keyString), new Text(refsSb.toString()));
context.getCounter(Counts.UNDEFINED).increment(1);
// Write out a line per reference. If more than one, flag it.;
for (int i = 0; i < refs.size(); i++) {
byte [] bs = refs.get(i);
int ordinal;
if (i <= 0) {
ordinal = Counts.UNDEFINED.ordinal();
context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
context.getCounter(Counts.UNDEFINED).increment(1);
} else {
ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
}
}
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
// Print out missing row; doing get on reference gives info on when the referencer
// was added which can help a little debugging. This info is only available in mapper
// output -- the 'Linked List error Key...' log message above. What we emit here is
// useless for debugging.
context.getCounter("undef", keyString).increment(1);
}
} else if (defCount > 0 && refs.size() == 0) {
// node is defined but not referenced
context.write(new Text(keyString), new Text("none"));
context.write(key, UNREF);
context.getCounter(Counts.UNREFERENCED).increment(1);
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
context.getCounter("unref", keyString).increment(1);
}
} else {
if (refs.size() > 1) {
if (refsSb != null) {
context.write(new Text(keyString), new Text(refsSb.toString()));
// Skip first reference.
for (int i = 1; i < refs.size(); i++) {
context.write(key,
new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
}
context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
}
// node is defined and referenced
context.getCounter(Counts.REFERENCED).increment(1);
}
}
/**
* Dump out extra info around references if there are any. Helps debugging.
* @return StringBuilder filled with references if any.
* @throws IOException
*/
private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
final List<byte []> refs)
throws IOException {
StringBuilder refsSb = null;
if (refs.isEmpty()) return refsSb;
refsSb = new StringBuilder();
String comma = "";
// If a row is a reference but has no define, print the content of the row that has
// this row as a 'prev'; it will help debug. The missing row was written just before
// the row we are dumping out here.
TableName tn = getTableName(context.getConfiguration());
try (Table t = this.connection.getTable(tn)) {
for (byte [] ref : refs) {
Result r = t.get(new Get(ref));
List<Cell> cells = r.listCells();
String ts = (cells != null && !cells.isEmpty())?
new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
b = r.getValue(FAMILY_NAME, COLUMN_PREV);
String refRegionLocation = "";
String keyRegionLocation = "";
if (b != null && b.length > 0) {
try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
HRegionLocation hrl = rl.getRegionLocation(b);
if (hrl != null) refRegionLocation = hrl.toString();
// Key here probably has trailing zeros on it.
hrl = rl.getRegionLocation(key.getBytes());
if (hrl != null) keyRegionLocation = hrl.toString();
}
}
LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
", refPrevEqualsKey=" +
(Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
", ref row date=" + ts + ", jobStr=" + jobStr +
", ref row count=" + count +
", ref row regionLocation=" + refRegionLocation +
", key row regionLocation=" + keyRegionLocation);
refsSb.append(comma);
comma = ",";
refsSb.append(Bytes.toStringBinary(ref));
}
}
return refsSb;
}
}
@ -707,7 +1002,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
job.setReducerClass(VerifyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
TextOutputFormat.setOutputPath(job, outputDir);
boolean success = job.waitForCompletion(true);
@ -756,23 +1053,26 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
protected void handleFailure(Counters counters) throws IOException {
Configuration conf = job.getConfiguration();
HConnection conn = HConnectionManager.getConnection(conf);
TableName tableName = getTableName(conf);
CounterGroup g = counters.getGroup("undef");
Iterator<Counter> it = g.iterator();
while (it.hasNext()) {
String keyString = it.next().getName();
byte[] key = Bytes.toBytes(keyString);
HRegionLocation loc = conn.relocateRegion(tableName, key);
LOG.error("undefined row " + keyString + ", " + loc);
}
g = counters.getGroup("unref");
it = g.iterator();
while (it.hasNext()) {
String keyString = it.next().getName();
byte[] key = Bytes.toBytes(keyString);
HRegionLocation loc = conn.relocateRegion(tableName, key);
LOG.error("unreferred row " + keyString + ", " + loc);
try (Connection conn = ConnectionFactory.createConnection(conf)) {
try (RegionLocator rl = conn.getRegionLocator(tableName)) {
CounterGroup g = counters.getGroup("undef");
Iterator<Counter> it = g.iterator();
while (it.hasNext()) {
String keyString = it.next().getName();
byte[] key = Bytes.toBytes(keyString);
HRegionLocation loc = rl.getRegionLocation(key, true);
LOG.error("undefined row " + keyString + ", " + loc);
}
g = counters.getGroup("unref");
it = g.iterator();
while (it.hasNext()) {
String keyString = it.next().getName();
byte[] key = Bytes.toBytes(keyString);
HRegionLocation loc = rl.getRegionLocation(key, true);
LOG.error("unreferred row " + keyString + ", " + loc);
}
}
}
}
}
@ -944,7 +1244,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
/**
* A stand alone program that follows a linked list created by {@link Generator} and prints timing info.
* A stand alone program that follows a linked list created by {@link Generator} and prints
* timing info.
*/
private static class Walker extends Configured implements Tool {
@Override
@ -1048,7 +1349,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
}
private static class Clean extends Configured implements Tool {
@Override public int run(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: Clean <output dir>");
@ -1136,16 +1436,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
private void printCommands() {
System.err.println("Commands:");
System.err.println(" Generator Map only job that generates data.");
System.err.println(" Verify A map reduce job that looks for holes. Look at the counts ");
System.err.println(" generator Map only job that generates data.");
System.err.println(" verify A map reduce job that looks for holes. Look at the counts ");
System.err.println(" after running. See REFERENCED and UNREFERENCED are ok. Any ");
System.err.println(" UNDEFINED counts are bad. Do not run with the Generator.");
System.err.println(" Walker " +
"Standalong program that starts following a linked list & emits timing info.");
System.err.println(" Print Standalone program that prints nodes in the linked list.");
System.err.println(" Delete Standalone program that deletes a·single node.");
System.err.println(" Loop Program to Loop through Generator and Verify steps");
System.err.println(" Clean Program to clean all left over detritus.");
System.err.println(" walker " +
"Standalone program that starts following a linked list & emits timing info.");
System.err.println(" print Standalone program that prints nodes in the linked list.");
System.err.println(" delete Standalone program that deletes a·single node.");
System.err.println(" loop Program to Loop through Generator and Verify steps");
System.err.println(" clean Program to clean all left over detritus.");
System.err.println(" search Search for missing keys.");
System.err.flush();
}
@ -1158,6 +1459,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
printUsage(this.getClass().getSimpleName() +
" <general options> COMMAND [<COMMAND options>]", "General options:", "");
printCommands();
// Have to throw an exception here to stop the processing. Looks ugly but gets message across.
throw new RuntimeException("Incorrect Number of args.");
}
toRun = args[0];
@ -1168,7 +1470,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
public int runTestFromCommandLine() throws Exception {
Tool tool = null;
if (toRun.equals("Generator")) {
if (toRun.equalsIgnoreCase("Generator")) {
tool = new Generator();
} else if (toRun.equalsIgnoreCase("Verify")) {
tool = new Verify();
@ -1184,6 +1486,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
tool = new Delete();
} else if (toRun.equalsIgnoreCase("Clean")) {
tool = new Clean();
} else if (toRun.equalsIgnoreCase("Search")) {
tool = new Search();
} else {
usage();
throw new RuntimeException("Unknown arg");
@ -1227,4 +1531,4 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
System.exit(ret);
}
}
}

View File

@ -221,9 +221,9 @@ public class HFileArchiver {
}
// otherwise we attempt to archive the store files
if (LOG.isTraceEnabled()) LOG.trace("Archiving compacted store files.");
if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files.");
// wrap the storefile into a File
// Wrap the storefile into a File
StoreToFile getStorePath = new StoreToFile(fs);
Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath);

View File

@ -128,14 +128,12 @@ public class WALPlayer extends Configured implements Tool {
* A mapper that writes out {@link Mutation} to be directly applied to
* a running HBase instance.
*/
static class WALMapper
protected static class WALMapper
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
private Map<TableName, TableName> tables =
new TreeMap<TableName, TableName>();
private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>();
@Override
public void map(WALKey key, WALEdit value,
Context context)
public void map(WALKey key, WALEdit value, Context context)
throws IOException {
try {
if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
@ -150,26 +148,28 @@ public class WALPlayer extends Configured implements Tool {
// filtering WAL meta entries
if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
// multiple rows (HBASE-5229).
// Aggregate as much as possible into a single Put/Delete
// operation before writing to the context.
if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
|| !CellUtil.matchingRow(lastCell, cell)) {
// row or type changed, write out aggregate KVs.
if (put != null) context.write(tableOut, put);
if (del != null) context.write(tableOut, del);
if (CellUtil.isDelete(cell)) {
del = new Delete(cell.getRow());
} else {
put = new Put(cell.getRow());
// Allow a subclass filter out this cell.
if (filter(context, cell)) {
// A WALEdit may contain multiple operations (HBASE-3584) and/or
// multiple rows (HBASE-5229).
// Aggregate as much as possible into a single Put/Delete
// operation before writing to the context.
if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
|| !CellUtil.matchingRow(lastCell, cell)) {
// row or type changed, write out aggregate KVs.
if (put != null) context.write(tableOut, put);
if (del != null) context.write(tableOut, del);
if (CellUtil.isDelete(cell)) {
del = new Delete(cell.getRow());
} else {
put = new Put(cell.getRow());
}
}
if (CellUtil.isDelete(cell)) {
del.addDeleteMarker(cell);
} else {
put.add(cell);
}
}
if (CellUtil.isDelete(cell)) {
del.addDeleteMarker(cell);
} else {
put.add(cell);
}
lastCell = cell;
}
@ -182,18 +182,30 @@ public class WALPlayer extends Configured implements Tool {
}
}
/**
* @param cell
* @return Return true if we are to emit this cell.
*/
protected boolean filter(Context context, final Cell cell) {
return true;
}
@Override
public void setup(Context context) throws IOException {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
if (tablesToUse == null && tableMap == null) {
// Then user wants all tables.
} else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
// this can only happen when WALMapper is used directly by a class other than WALPlayer
throw new IOException("No tables or incorrect table mapping specified.");
}
int i = 0;
for (String table : tablesToUse) {
tables.put(TableName.valueOf(table),
if (tablesToUse != null) {
for (String table : tablesToUse) {
tables.put(TableName.valueOf(table),
TableName.valueOf(tableMap[i++]));
}
}
}
}
@ -337,4 +349,4 @@ public class WALPlayer extends Configured implements Tool {
Job job = createSubmittableJob(otherArgs);
return job.waitForCompletion(true) ? 0 : 1;
}
}
}

View File

@ -167,7 +167,7 @@ public class SplitLogManager {
/**
* Get a list of paths that need to be split given a set of server-specific directories and
* optinally a filter.
* optionally a filter.
*
* See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
* layout.

View File

@ -251,8 +251,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
int deletedFileCount = 0;
for (FileStatus file : filesToDelete) {
Path filePath = file.getPath();
if (LOG.isTraceEnabled()) {
LOG.trace("Removing: " + filePath + " from archive");
if (LOG.isDebugEnabled()) {
LOG.debug("Removing: " + filePath + " from archive");
}
try {
boolean success = this.fs.delete(filePath, false);

View File

@ -3666,11 +3666,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
internalFlushcache(null, seqid, stores.values(), status);
}
// Now delete the content of recovered edits. We're done w/ them.
for (Path file: files) {
if (!fs.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
for (Path file: files) {
fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
null, null));
}
getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
for (Path file: files) {
if (!fs.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
}
}
}
return seqid;
@ -3710,8 +3723,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
// How many edits seen before we check elapsed time
int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
2000);
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
// How often to send a progress report (default 1/2 master timeout)
int period = this.conf.getInt("hbase.hstore.report.period", 300000);
long lastReport = EnvironmentEdgeManager.currentTime();
@ -3770,21 +3782,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
continue;
}
}
// Check this edit is for this region.
if (!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {
skippedEdits++;
continue;
}
boolean flush = false;
for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
//this is a special edit, we should handle it
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
//replay the compaction
completeCompactionMarker(compaction);
}
skippedEdits++;
continue;
}
@ -3810,10 +3825,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
if (!flush) {
flush = restoreEdit(store, cell);
}
flush |= restoreEdit(store, cell);
editsCount++;
}
if (flush) {
@ -5040,6 +5052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return qualified path of region directory
*/
@Deprecated
@VisibleForTesting
public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
return new Path(
FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());

View File

@ -1018,6 +1018,7 @@ public class FSHLog implements WAL {
i.preLogArchive(p, newPath);
}
}
LOG.info("Archiving " + p + " to " + newPath);
if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
throw new IOException("Unable to rename " + p + " to " + newPath);
}

View File

@ -1204,7 +1204,7 @@ public abstract class FSUtils {
private List<String> blacklist;
/**
* Create a filter on the give filesystem with the specified blacklist
* Create a filter on the givem filesystem with the specified blacklist
* @param fs filesystem to filter
* @param directoryNameBlackList list of the names of the directories to filter. If
* <tt>null</tt>, all directories are returned

View File

@ -75,7 +75,7 @@ public class WALPrettyPrinter {
// enable in order to output a single list of transactions from several files
private boolean persistentOutput;
private boolean firstTxn;
// useful for programatic capture of JSON output
// useful for programmatic capture of JSON output
private PrintStream out;
// for JSON encoding
private static final ObjectMapper MAPPER = new ObjectMapper();
@ -267,8 +267,9 @@ public class WALPrettyPrinter {
Map<String, Object> op = new HashMap<String, Object>(toStringMap(cell));
if (outputValues) op.put("value", Bytes.toStringBinary(cell.getValue()));
// check row output filter
if (row == null || ((String) op.get("row")).equals(row))
if (row == null || ((String) op.get("row")).equals(row)) {
actions.add(op);
}
}
if (actions.size() == 0)
continue;
@ -283,22 +284,16 @@ public class WALPrettyPrinter {
out.print(MAPPER.writeValueAsString(txn));
} else {
// Pretty output, complete with indentation by atomic action
out.println("Sequence " + txn.get("sequence") + " "
+ "from region " + txn.get("region") + " " + "in table "
+ txn.get("table") + " at write timestamp: " + new Date(writeTime));
out.println("Sequence=" + txn.get("sequence") + " "
+ ", region=" + txn.get("region") + " at write timestamp=" + new Date(writeTime));
for (int i = 0; i < actions.size(); i++) {
Map op = actions.get(i);
out.println(" Action:");
out.println(" row: " + op.get("row"));
out.println(" column: " + op.get("family") + ":"
+ op.get("qualifier"));
out.println(" timestamp: "
+ (new Date((Long) op.get("timestamp"))));
if(op.get("tag") != null) {
out.println("row=" + op.get("row") +
", column=" + op.get("family") + ":" + op.get("qualifier"));
if (op.get("tag") != null) {
out.println(" tag: " + op.get("tag"));
}
if (outputValues)
out.println(" value: " + op.get("value"));
if (outputValues) out.println(" value: " + op.get("value"));
}
}
}
@ -347,8 +342,6 @@ public class WALPrettyPrinter {
* Command line arguments
* @throws IOException
* Thrown upon file system errors etc.
* @throws ParseException
* Thrown if command-line parsing fails.
*/
public static void run(String[] args) throws IOException {
// create options
@ -364,7 +357,7 @@ public class WALPrettyPrinter {
WALPrettyPrinter printer = new WALPrettyPrinter();
CommandLineParser parser = new PosixParser();
List files = null;
List<?> files = null;
try {
CommandLine cmd = parser.parse(options, args);
files = cmd.getArgList();

View File

@ -49,13 +49,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
@ -73,9 +68,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagRewriteCell;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@ -83,9 +76,9 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
@ -105,9 +98,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
@ -115,17 +111,17 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
/**
* This class is responsible for splitting up a bunch of regionserver commit log
@ -269,8 +265,7 @@ public class WALSplitter {
* log splitting implementation, splits one log file.
* @param logfile should be an actual log file.
*/
boolean splitLogFile(FileStatus logfile,
CancelableProgressable reporter) throws IOException {
boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
Preconditions.checkState(status == null);
Preconditions.checkArgument(logfile.isFile(),
"passed in file status is for something other than a regular file.");
@ -399,8 +394,9 @@ public class WALSplitter {
} finally {
String msg =
"Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
+ " regions; log file=" + logPath + " is corrupted = " + isCorrupted
+ " progress failed = " + progress_failed;
+ " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
", length=" + logfile.getLen() + // See if length got updated post lease recovery
", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
LOG.info(msg);
status.markComplete(msg);
}
@ -714,8 +710,8 @@ public class WALSplitter {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Written region seqId to file:" + newSeqIdFile + " ,newSeqId=" + newSeqId
+ " ,maxSeqId=" + maxSeqId);
LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
+ ", maxSeqId=" + maxSeqId);
}
} catch (FileAlreadyExistsException ignored) {
// latest hdfs throws this exception. it's all right if newSeqIdFile already exists

Binary file not shown.

View File

@ -2526,14 +2526,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* Stops the previously started <code>MiniMRCluster</code>.
*/
public void shutdownMiniMapReduceCluster() {
LOG.info("Stopping mini mapreduce cluster...");
if (mrCluster != null) {
LOG.info("Stopping mini mapreduce cluster...");
mrCluster.shutdown();
mrCluster = null;
LOG.info("Mini mapreduce cluster stopped");
}
// Restore configuration to point to local jobtracker
conf.set("mapreduce.jobtracker.address", "local");
LOG.info("Mini mapreduce cluster stopped");
}
/**

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mortbay.log.Log;
/**
* Tests around replay of recovered.edits content.
*/
@Category({MediumTests.class})
public class TestRecoveredEdits {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule public TestName testName = new TestName();
/**
* HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask.
* Create a region. Close it. Then copy into place a file to replay, one that is bigger than
* configured flush size so we bring on lots of flushes. Then reopen and confirm all edits
* made it in.
* @throws IOException
*/
@Test (timeout=30000)
public void testReplayWorksThoughLotsOfFlushing() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
// Set it so we flush every 1M or so. Thats a lot.
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
// The file of recovered edits has a column family of 'meta'. Also has an encoded regionname
// of 4823016d8fca70b25503ee07f4c6d79f which needs to match on replay.
final String encodedRegionName = "4823016d8fca70b25503ee07f4c6d79f";
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(testName.getMethodName()));
final String columnFamily = "meta";
byte [][] columnFamilyAsByteArray = new byte [][] {Bytes.toBytes(columnFamily)};
htd.addFamily(new HColumnDescriptor(columnFamily));
HRegionInfo hri = new HRegionInfo(htd.getTableName()) {
@Override
public synchronized String getEncodedName() {
return encodedRegionName;
}
// Cache the name because lots of lookups.
private byte [] encodedRegionNameAsBytes = null;
@Override
public synchronized byte[] getEncodedNameAsBytes() {
if (encodedRegionNameAsBytes == null) {
this.encodedRegionNameAsBytes = Bytes.toBytes(getEncodedName());
}
return this.encodedRegionNameAsBytes;
}
};
Path hbaseRootDir = TEST_UTIL.getDataTestDir();
HRegion region = HRegion.createHRegion(hri, hbaseRootDir, conf, htd, null);
assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
List<String> storeFiles = region.getStoreFileList(columnFamilyAsByteArray);
// There should be no store files.
assertTrue(storeFiles.isEmpty());
region.close();
Path regionDir = region.getRegionDir(hbaseRootDir, hri);
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
// This is a little fragile getting this path to a file of 10M of edits.
Path recoveredEditsFile = new Path(new Path(
System.getProperty("project.build.testSourceDirectory", "src" + Path.SEPARATOR + "test"),
"data"), "0000000000000016310");
// Copy this file under the region's recovered.edits dir so it is replayed on reopen.
FileSystem fs = FileSystem.get(conf);
Path destination = new Path(recoveredEditsDir, recoveredEditsFile.getName());
fs.copyToLocalFile(recoveredEditsFile, destination);
assertTrue(fs.exists(destination));
// Now the file 0000000000000016310 is under recovered.edits, reopen the region to replay.
region = HRegion.openHRegion(region, null);
assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
storeFiles = region.getStoreFileList(columnFamilyAsByteArray);
// Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if
// we flush at 1MB, that there are at least 3 flushed files that are there because of the
// replay of edits.
assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10);
// Now verify all edits made it into the region.
int count = verifyAllEditsMadeItIn(fs, conf, recoveredEditsFile, region);
Log.info("Checked " + count + " edits made it in");
}
/**
* @param fs
* @param conf
* @param edits
* @param region
* @return Return how many edits seen.
* @throws IOException
*/
private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf,
final Path edits, final HRegion region)
throws IOException {
int count = 0;
// Based on HRegion#replayRecoveredEdits
WAL.Reader reader = null;
try {
reader = WALFactory.createReader(fs, edits, conf);
WAL.Entry entry;
while ((entry = reader.next()) != null) {
WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
count++;
// Check this edit is for this region.
if (!Bytes.equals(key.getEncodedRegionName(),
region.getRegionInfo().getEncodedNameAsBytes())) {
continue;
}
Cell previous = null;
for (Cell cell: val.getCells()) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
if (previous != null && CellComparator.compareRows(previous, cell) == 0) continue;
previous = cell;
Get g = new Get(CellUtil.cloneRow(cell));
Result r = region.get(g);
boolean found = false;
for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
Cell current = scanner.current();
if (CellComparator.compare(cell, current, true) == 0) {
found = true;
break;
}
}
assertTrue("Failed to find " + cell, found);
}
}
} finally {
if (reader != null) reader.close();
}
return count;
}
}