From 0717d134167e165bf6d40845f4f102e8ab2145b9 Mon Sep 17 00:00:00 2001 From: larsh Date: Thu, 12 Apr 2012 22:25:46 +0000 Subject: [PATCH] HBASE-5604 M/R tool to replay WAL files git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1325555 13f79535-47bb-0310-9956-ffa450edef68 --- src/docbkx/ops_mgt.xml | 19 +- .../hbase/mapreduce/HLogInputFormat.java | 268 +++++++++++++++ .../hadoop/hbase/mapreduce/WALPlayer.java | 309 ++++++++++++++++++ .../hbase/mapreduce/TestHLogRecordReader.java | 240 ++++++++++++++ .../hadoop/hbase/mapreduce/TestWALPlayer.java | 121 +++++++ 5 files changed, 956 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java create mode 100644 src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java create mode 100644 src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java create mode 100644 src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java diff --git a/src/docbkx/ops_mgt.xml b/src/docbkx/ops_mgt.xml index 5b1406409e0..7c52dd498c7 100644 --- a/src/docbkx/ops_mgt.xml +++ b/src/docbkx/ops_mgt.xml @@ -148,7 +148,24 @@ This page currently exists on the website and will eventually be migrated into the RefGuide. -\
+
+ WALPlayer + WALPlayer is a utility to replay WAL files into HBase. + + The WAL can be replayed for a set of tables or all tables, and a timerange can be provided (in milliseconds). The WAL is filtered to this set of tables. The output can optionally be mapped to another set of tables. + + WALPlayer can also generate HFiles for later bulk importing, in that case only a single table and no mapping can be specified. + + Invoke via: +$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer [options] <wal inputdir> <tables> [<tableMappings>]> + + + For example: +$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer /backuplogdir oldTable1,oldTable2 newTable1,newTable2 + + +
+
RowCounter RowCounter is a utility that will count all the rows of a table. This is a good utility to use as a sanity check to ensure that HBase can read all the blocks of a table if there are any concerns of metadata inconsistency. diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java new file mode 100644 index 00000000000..747063c6f58 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Simple {@link InputFormat} for {@link HLog} files. + */ +@InterfaceAudience.Public +public class HLogInputFormat extends InputFormat { + private static Log LOG = LogFactory.getLog(HLogInputFormat.class); + + public static String START_TIME_KEY = "hlog.start.time"; + public static String END_TIME_KEY = "hlog.end.time"; + + /** + * {@link InputSplit} for {@link HLog} files. Each split represent + * exactly one log file. + */ + static class HLogSplit extends InputSplit implements Writable { + private String logFileName; + private long fileSize; + private long startTime; + private long endTime; + + /** for serialization */ + public HLogSplit() {} + + /** + * Represent an HLogSplit, i.e. a single HLog file. + * Start- and EndTime are managed by the split, so that HLog files can be + * filtered before WALEdits are passed to the mapper(s). + * @param logFileName + * @param fileSize + * @param startTime + * @param endTime + */ + public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) { + this.logFileName = logFileName; + this.fileSize = fileSize; + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return fileSize; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + // TODO: Find the data node with the most blocks for this HLog? + return new String[] {}; + } + + public String getLogFileName() { + return logFileName; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + @Override + public void readFields(DataInput in) throws IOException { + logFileName = in.readUTF(); + fileSize = in.readLong(); + startTime = in.readLong(); + endTime = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(logFileName); + out.writeLong(fileSize); + out.writeLong(startTime); + out.writeLong(endTime); + } + + @Override + public String toString() { + return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; + } + } + + /** + * {@link RecordReader} for an {@link HLog} file. + */ + static class HLogRecordReader extends RecordReader { + private HLog.Reader reader = null; + private HLog.Entry currentEntry = new HLog.Entry(); + private long startTime; + private long endTime; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + HLogSplit hsplit = (HLogSplit)split; + Path logFile = new Path(hsplit.getLogFileName()); + Configuration conf = context.getConfiguration(); + LOG.info("Opening reader for "+split); + try { + this.reader = HLog.getReader(logFile.getFileSystem(conf), logFile, conf); + } catch (EOFException x) { + LOG.info("Ignoring corrupted HLog file: " + logFile + + " (This is normal when a RegionServer crashed.)"); + } + this.startTime = hsplit.getStartTime(); + this.endTime = hsplit.getEndTime(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (reader == null) return false; + + HLog.Entry temp; + long i = -1; + do { + // skip older entries + try { + temp = reader.next(currentEntry); + i++; + } catch (EOFException x) { + LOG.info("Corrupted entry detected. Ignoring the rest of the file." + + " (This is normal when a RegionServer crashed.)"); + return false; + } + } + while(temp != null && temp.getKey().getWriteTime() < startTime); + + if (temp == null) { + if (i > 0) LOG.info("Skipped " + i + " entries."); + LOG.info("Reached end of file."); + return false; + } else if (i > 0) { + LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); + } + boolean res = temp.getKey().getWriteTime() <= endTime; + if (!res) { + LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file."); + } + return res; + } + + @Override + public HLogKey getCurrentKey() throws IOException, InterruptedException { + return currentEntry.getKey(); + } + + @Override + public WALEdit getCurrentValue() throws IOException, InterruptedException { + return currentEntry.getEdit(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // N/A depends on total number of entries, which is unknown + return 0; + } + + @Override + public void close() throws IOException { + LOG.info("Closing reader"); + if (reader != null) this.reader.close(); + } + } + + @Override + public List getSplits(JobContext context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + Path inputDir = new Path(conf.get("mapred.input.dir")); + + long startTime = conf.getLong(START_TIME_KEY, Long.MIN_VALUE); + long endTime = conf.getLong(END_TIME_KEY, Long.MAX_VALUE); + + FileSystem fs = inputDir.getFileSystem(conf); + List files = getFiles(fs, inputDir, startTime, endTime); + + List splits = new ArrayList(files.size()); + for (FileStatus file : files) { + splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); + } + return splits; + } + + private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) + throws IOException { + List result = new ArrayList(); + LOG.debug("Scanning " + dir.toString() + " for HLog files"); + + FileStatus[] files = fs.listStatus(dir); + if (files == null) return Collections.emptyList(); + for (FileStatus file : files) { + if (file.isDir()) { + // recurse into sub directories + result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); + } else { + String name = file.getPath().toString(); + int idx = name.lastIndexOf('.'); + if (idx > 0) { + try { + long fileStartTime = Long.parseLong(name.substring(idx+1)); + if (fileStartTime <= endTime) { + LOG.info("Found: " + name); + result.add(file); + } + } catch (NumberFormatException x) { + idx = 0; + } + } + if (idx == 0) { + LOG.warn("File " + name + " does not appear to be an HLog file. Skipping..."); + } + } + } + return result; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new HLogRecordReader(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java new file mode 100644 index 00000000000..9b1f239b423 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A tool to replay WAL files as a M/R job. + * The WAL can be replayed for a set of tables or all tables, + * and a timerange can be provided (in milliseconds). + * The WAL is filtered to the passed set of tables and the output + * can optionally be mapped to another set of tables. + * + * WAL replay can also generate HFiles for later bulk importing, + * in that case the WAL is replayed for a single table only. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class WALPlayer extends Configured implements Tool { + final static String NAME = "WALPlayer"; + final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output"; + final static String HLOG_INPUT_KEY = "hlog.input.dir"; + final static String TABLES_KEY = "hlog.input.tables"; + final static String TABLE_MAP_KEY = "hlog.input.tablesmap"; + + /** + * A mapper that just writes out KeyValues. + * This one can be used together with {@link KeyValueSortReducer} + */ + static class HLogKeyValueMapper + extends Mapper { + private byte[] table; + + @Override + public void map(HLogKey key, WALEdit value, + Context context) + throws IOException { + try { + // skip all other tables + if (Bytes.equals(table, key.getTablename())) { + for (KeyValue kv : value.getKeyValues()) { + if (HLog.isMetaFamily(kv.getFamily())) continue; + context.write(new ImmutableBytesWritable(kv.getRow()), kv); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) throws IOException { + // only a single table is supported when HFiles are generated with HFileOutputFormat + String tables[] = context.getConfiguration().getStrings(TABLES_KEY); + if (tables == null || tables.length != 1) { + // this can only happen when HLogMapper is used directly by a class other than WALPlayer + throw new IOException("Exactly one table must be specified for bulk HFile case."); + } + table = Bytes.toBytes(tables[0]); + } + } + + /** + * A mapper that writes out {@link Mutation} to be directly applied to + * a running HBase instance. + */ + static class HLogMapper + extends Mapper { + private Map tables = new TreeMap(Bytes.BYTES_COMPARATOR); + + @Override + public void map(HLogKey key, WALEdit value, + Context context) + throws IOException { + try { + if (tables.isEmpty() || tables.containsKey(key.getTablename())) { + byte[] targetTable = tables.isEmpty() ? + key.getTablename() : + tables.get(key.getTablename()); + ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable); + Put put = null; + Delete del = null; + KeyValue lastKV = null; + for (KeyValue kv : value.getKeyValues()) { + // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit + if (HLog.isMetaFamily(kv.getFamily())) continue; + + // A WALEdit may contain multiple operations (HBASE-3584) and/or + // multiple rows (HBASE-5229). + // Aggregate as much as possible into a single Put/Delete + // operation before writing to the context. + if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + // row or type changed, write out aggregate KVs. + if (put != null) context.write(tableOut, put); + if (del != null) context.write(tableOut, del); + + if (kv.isDelete()) { + del = new Delete(kv.getRow()); + } else { + put = new Put(kv.getRow()); + } + } + if (kv.isDelete()) { + del.addDeleteMarker(kv); + } else { + put.add(kv); + } + lastKV = kv; + } + // write residual KVs + if (put != null) context.write(tableOut, put); + if (del != null) context.write(tableOut, del); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) throws IOException { + String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); + String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); + if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { + // this can only happen when HLogMapper is used directly by a class other than WALPlayer + throw new IOException("No tables or incorrect table mapping specified."); + } + int i = 0; + for (String table : tablesToUse) { + tables.put(Bytes.toBytes(table), Bytes.toBytes(tableMap[i++])); + } + } + } + + /** + * @param conf The {@link Configuration} to use. + */ + public WALPlayer(Configuration conf) { + super(conf); + } + + void setupTime(Configuration conf, String option) throws IOException { + String val = conf.get(option); + if (val == null) return; + long ms; + try { + // first try to parse in user friendly form + ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime(); + } catch (ParseException pe) { + try { + // then see if just a number of ms's was specified + ms = Long.parseLong(val); + } catch (NumberFormatException nfe) { + throw new IOException(option + + " must be specified either in the form 2001-02-20T16:35:06.99 " + + "or as number of milliseconds"); + } + } + conf.setLong(option, ms); + } + + /** + * Sets up the actual job. + * + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) + throws IOException { + Configuration conf = getConf(); + setupTime(conf, HLogInputFormat.START_TIME_KEY); + setupTime(conf, HLogInputFormat.END_TIME_KEY); + Path inputDir = new Path(args[0]); + String[] tables = args[1].split(","); + String[] tableMap; + if (args.length > 2) { + tableMap = args[2].split(","); + if (tableMap.length != tables.length) { + throw new IOException("The same number of tables and mapping must be provided."); + } + } else { + // if not mapping is specified map each table to itself + tableMap = tables; + } + conf.setStrings(TABLES_KEY, tables); + conf.setStrings(TABLE_MAP_KEY, tableMap); + Job job = new Job(conf, NAME + "_" + inputDir); + job.setJarByClass(WALPlayer.class); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(HLogInputFormat.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + // the bulk HFile case + if (tables.length != 1) { + throw new IOException("Exactly one table must be specified for the bulk export option"); + } + HTable table = new HTable(conf, tables[0]); + job.setMapperClass(HLogKeyValueMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat.configureIncrementalLoad(job, table); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } else { + // output to live cluster + job.setMapperClass(HLogMapper.class); + job.setOutputFormatClass(MultiTableOutputFormat.class); + TableMapReduceUtil.addDependencyJars(job); + // No reducers. + job.setNumReduceTasks(0); + } + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: " + NAME + " [options] []"); + System.err.println("Read all WAL entries for ."); + System.err.println("If no tables (\"\") are specific, all tables are imported."); + System.err.println("(Careful, even -ROOT- and .META. entries will be imported in that case.)"); + System.err.println("Otherwise is a comma separated list of tables.\n"); + System.err.println("The WAL entries can be mapped to new set of tables via ."); + System.err.println(" is a command separated list of targettables."); + System.err.println("If specified, each table in must have a mapping.\n"); + System.err.println("By default " + NAME + " will load data directly into HBase."); + System.err.println("To generate HFiles for a bulk data load instead, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println(" (Only one table can be specified, and no mapping is allowed!)"); + System.err.println("Other options: (specify time range to WAL edit to consider)"); + System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]"); + System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapred.map.tasks.speculative.execution=false\n" + + " -Dmapred.reduce.tasks.speculative.execution=false"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + if (otherArgs.length < 2) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + Job job = createSubmittableJob(otherArgs); + return job.waitForCompletion(true) ? 0 : 1; + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java new file mode 100644 index 00000000000..0b3ba838b65 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * JUnit tests for the HLogRecordReader + */ +@Category(MediumTests.class) +public class TestHLogRecordReader { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + private static FileSystem fs; + private static Path hbaseDir; + private static final byte [] tableName = Bytes.toBytes(getName()); + private static final byte [] rowName = tableName; + private static final HRegionInfo info = new HRegionInfo(tableName, + Bytes.toBytes(""), Bytes.toBytes(""), false); + private static final byte [] family = Bytes.toBytes("column"); + private static final byte [] value = Bytes.toBytes("value"); + private static HTableDescriptor htd; + private static Path logDir; + private static Path oldLogDir; + + private static String getName() { + return "TestHLogRecordReader"; + } + + @Before + public void setUp() throws Exception { + FileStatus[] entries = fs.listStatus(hbaseDir); + for (FileStatus dir : entries) { + fs.delete(dir.getPath(), true); + } + + } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + conf = TEST_UTIL.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.replication", 1); + TEST_UTIL.startMiniDFSCluster(1); + + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + + hbaseDir = TEST_UTIL.createRootDir(); + logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test partial reads from the log based on passed time range + * @throws Exception + */ + @Test + public void testPartialRead() throws Exception { + HLog log = new HLog(fs, logDir, oldLogDir, conf); + long ts = System.currentTimeMillis(); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + ts, value)); + log.append(info, tableName, edit, + ts, htd); + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + ts+1, value)); + log.append(info, tableName, edit, + ts+1, htd); + log.rollWriter(); + + Thread.sleep(1); + long ts1 = System.currentTimeMillis(); + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), + ts1+1, value)); + log.append(info, tableName, edit, + ts1+1, htd); + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), + ts1+2, value)); + log.append(info, tableName, edit, + ts1+2, htd); + log.close(); + + HLogInputFormat input = new HLogInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapred.input.dir", logDir.toString()); + jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts); + + // only 1st file is considered, and only its 1st entry is used + List splits = input.getSplits(new JobContext(jobConf, new JobID())); + assertEquals(1, splits.size()); + testSplit(splits.get(0), Bytes.toBytes("1")); + + jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1); + jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1); + splits = input.getSplits(new JobContext(jobConf, new JobID())); + // both files need to be considered + assertEquals(2, splits.size()); + // only the 2nd entry from the 1st file is used + testSplit(splits.get(0), Bytes.toBytes("2")); + // only the 1nd entry from the 2nd file is used + testSplit(splits.get(1), Bytes.toBytes("3")); + } + + /** + * Test basic functionality + * @throws Exception + */ + @Test + public void testHLogRecordReader() throws Exception { + HLog log = new HLog(fs, logDir, oldLogDir, conf); + byte [] value = Bytes.toBytes("value"); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + System.currentTimeMillis(), value)); + log.append(info, tableName, edit, + System.currentTimeMillis(), htd); + + Thread.sleep(1); // make sure 2nd log gets a later timestamp + long secondTs = System.currentTimeMillis(); + log.rollWriter(); + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + System.currentTimeMillis(), value)); + log.append(info, tableName, edit, + System.currentTimeMillis(), htd); + log.close(); + long thirdTs = System.currentTimeMillis(); + + // should have 2 log files now + HLogInputFormat input = new HLogInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapred.input.dir", logDir.toString()); + + // make sure both logs are found + List splits = input.getSplits(new JobContext(jobConf, new JobID())); + assertEquals(2, splits.size()); + + // should return exactly one KV + testSplit(splits.get(0), Bytes.toBytes("1")); + // same for the 2nd split + testSplit(splits.get(1), Bytes.toBytes("2")); + + // now test basic time ranges: + + // set an endtime, the 2nd log file can be ignored completely. + jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1); + splits = input.getSplits(new JobContext(jobConf, new JobID())); + assertEquals(1, splits.size()); + testSplit(splits.get(0), Bytes.toBytes("1")); + + // now set a start time + jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE); + jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs); + splits = input.getSplits(new JobContext(jobConf, new JobID())); + // both logs need to be considered + assertEquals(2, splits.size()); + // but both readers skip all edits + testSplit(splits.get(0)); + testSplit(splits.get(1)); + } + + /** + * Create a new reader from the split, and match the edits against the passed columns. + */ + private void testSplit(InputSplit split, byte[]... columns) throws Exception { + HLogRecordReader reader = new HLogRecordReader(); + reader.initialize(split, new TaskAttemptContext(conf, new TaskAttemptID())); + + for (byte[] column : columns) { + assertTrue(reader.nextKeyValue()); + assertTrue(Bytes + .equals(column, reader.getCurrentValue().getKeyValues().get(0).getQualifier())); + } + assertFalse(reader.nextKeyValue()); + reader.close(); + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java new file mode 100644 index 00000000000..674b1c8da0e --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Basic test for the WALPlayer M/R tool + */ +public class TestWALPlayer { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster; + + @BeforeClass + public static void beforeClass() throws Exception { + cluster = TEST_UTIL.startMiniCluster(); + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Simple end-to-end test + * @throws Exception + */ + @Test + public void testWALPlayer() throws Exception { + final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1"); + final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + final byte[] COLUMN2 = Bytes.toBytes("c2"); + final byte[] ROW = Bytes.toBytes("row"); + HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY); + HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY); + + // put a row into the first table + Put p = new Put(ROW); + p.add(FAMILY, COLUMN1, COLUMN1); + p.add(FAMILY, COLUMN2, COLUMN2); + t1.put(p); + // delete one column + Delete d = new Delete(ROW); + d.deleteColumns(FAMILY, COLUMN1); + t1.delete(d); + + // replay the WAL, map table 1 to table 2 + HLog log = cluster.getRegionServer(0).getWAL(); + log.rollWriter(); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() + .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); + + WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration()); + assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1), + Bytes.toString(TABLENAME2) })); + + // verify the WAL was player into table 2 + Get g = new Get(ROW); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(Bytes.equals(COLUMN2, r.raw()[0].getQualifier())); + } + + /** + * Simple test for data parsing + * @throws Exception + */ + @Test + public void testTimeFormat() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration()); + player.setupTime(conf, HLogInputFormat.END_TIME_KEY); + // make sure if nothing is specified nothing is set + assertNull(conf.get(HLogInputFormat.END_TIME_KEY)); + // test a textual data (including ms) + conf.set(HLogInputFormat.END_TIME_KEY, "2012-4-10T14:21:01.01"); + player.setupTime(conf, HLogInputFormat.END_TIME_KEY); + assertEquals(1334092861001L, conf.getLong(HLogInputFormat.END_TIME_KEY, 0)); + // test with mss as a long + conf.set(HLogInputFormat.END_TIME_KEY, "1334092861010"); + player.setupTime(conf, HLogInputFormat.END_TIME_KEY); + assertEquals(1334092861010L, conf.getLong(HLogInputFormat.END_TIME_KEY, 0)); + } +}