From c3f5e1f55104828b5ae796b1726aaba52a6dc9d6 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 17 Sep 2009 03:25:35 +0000 Subject: [PATCH] HBASE-1684 Backup (Export/Import) contrib tool for 0.20 git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@816038 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../apache/hadoop/hbase/mapreduce/Driver.java | 2 + .../apache/hadoop/hbase/mapreduce/Export.java | 120 +++++++++++++++++ .../apache/hadoop/hbase/mapreduce/Import.java | 126 ++++++++++++++++++ .../hadoop/hbase/mapreduce/RowCounter.java | 39 +++--- .../hbase/mapreduce/TableInputFormatBase.java | 7 +- .../hbase/mapreduce/TableMapReduceUtil.java | 8 +- 7 files changed, 274 insertions(+), 29 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/mapreduce/Export.java create mode 100644 src/java/org/apache/hadoop/hbase/mapreduce/Import.java diff --git a/CHANGES.txt b/CHANGES.txt index 4ef455c68a1..7ed84cfd6da 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -59,6 +59,7 @@ Release 0.21.0 - Unreleased HBASE-1835 Add more delete tests HBASE-1574 Client and server APIs to do batch deletes HBASE-1833 hfile.main fixes + HBASE-1684 Backup (Export/Import) contrib tool for 0.20 OPTIMIZATIONS diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java index f0e96923414..e22e2963793 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java @@ -34,6 +34,8 @@ public class Driver { ProgramDriver pgd = new ProgramDriver(); pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table"); + pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); + pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); pgd.driver(args); } } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/Export.java b/src/java/org/apache/hadoop/hbase/mapreduce/Export.java new file mode 100644 index 00000000000..2998450771e --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -0,0 +1,120 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +/** + * Export an HBase table. + * Writes content to sequence files up in HDFS. Use {@link Import} to read it + * back in again. + */ +public class Export { + final static String NAME = "export"; + + /** + * Mapper. + */ + static class Exporter + extends TableMapper { + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + context.write(row, value); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path outputDir = new Path(args[1]); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(Exporter.class); + // TODO: Allow passing filter and subset of rows/columns. + TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), + Exporter.class, null, null, job); + // No reducers. Just write straight to output files. + job.setNumReduceTasks(0); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Result.class); + FileOutputFormat.setOutputPath(job, outputDir); + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: Export "); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + HBaseConfiguration conf = new HBaseConfiguration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 2) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/Import.java b/src/java/org/apache/hadoop/hbase/mapreduce/Import.java new file mode 100644 index 00000000000..fa2bfc4758c --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -0,0 +1,126 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +/** + * Import data written by {@link Export}. + */ +public class Import { + final static String NAME = "import"; + + /** + * Write table content out to files in hdfs. + */ + static class Importer + extends TableMapper { + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + context.write(row, resultToPut(row, value)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private static Put resultToPut(ImmutableBytesWritable key, Result result) + throws IOException { + Put put = new Put(key.get()); + for (KeyValue kv : result.raw()) { + put.add(kv); + } + return put; + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path inputDir = new Path(args[1]); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(Importer.class); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(Importer.class); + // No reducers. Just write straight to table. Call initTableReducerJob + // because it sets up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(tableName, null, job); + job.setNumReduceTasks(0); + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: Import "); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + HBaseConfiguration conf = new HBaseConfiguration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 2) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 125cd405f73..40e912b1688 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -22,15 +22,14 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** @@ -49,7 +48,7 @@ public class RowCounter { extends TableMapper { /** Counter enumeration to count the actual rows. */ - public static enum Counters { ROWS } + public static enum Counters {ROWS} /** * Maps the data. @@ -72,7 +71,6 @@ public class RowCounter { } } } - } /** @@ -85,11 +83,12 @@ public class RowCounter { */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { - Job job = new Job(conf, NAME); + String tableName = args[0]; + Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(RowCounter.class); // Columns are space delimited StringBuilder sb = new StringBuilder(); - final int columnoffset = 2; + final int columnoffset = 1; for (int i = columnoffset; i < args.length; i++) { if (i > columnoffset) { sb.append(" "); @@ -97,20 +96,21 @@ public class RowCounter { sb.append(args[i]); } Scan scan = new Scan(); - for(String columnName : sb.toString().split(" ")) { - String [] fields = columnName.split(":"); - if(fields.length == 1) { - scan.addFamily(Bytes.toBytes(fields[0])); - } else { - scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); + if (sb.length() > 0) { + for (String columnName :sb.toString().split(" ")) { + String [] fields = columnName.split(":"); + if(fields.length == 1) { + scan.addFamily(Bytes.toBytes(fields[0])); + } else { + scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); + } } - } + } // Second argument is the table name. - TableMapReduceUtil.initTableMapperJob(args[1], scan, + job.setOutputFormatClass(NullOutputFormat.class); + TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); job.setNumReduceTasks(0); - // first argument is the output directory. - FileOutputFormat.setOutputPath(job, new Path(args[0])); return job; } @@ -123,10 +123,9 @@ public class RowCounter { public static void main(String[] args) throws Exception { HBaseConfiguration conf = new HBaseConfiguration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); - if (otherArgs.length < 3) { + if (otherArgs.length < 1) { System.err.println("ERROR: Wrong number of parameters: " + args.length); - System.err.println("Usage: " + NAME + - " [...]"); + System.err.println("Usage: RowCounter [ ...]"); System.exit(-1); } Job job = createSubmittableJob(conf, otherArgs); diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 81ba8c61548..af9759011f4 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -248,7 +248,7 @@ extends InputFormat { if (trr == null) { trr = new TableRecordReader(); } - Scan sc = new Scan(scan); + Scan sc = new Scan(this.scan); sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); @@ -276,9 +276,6 @@ extends InputFormat { if (table == null) { throw new IOException("No table was provided."); } - if (!scan.hasFamilies()) { - throw new IOException("Expecting at least one column."); - } int realNumSplits = startKeys.length; InputSplit[] splits = new InputSplit[realNumSplits]; int middle = startKeys.length / realNumSplits; @@ -319,7 +316,7 @@ extends InputFormat { * @return The internal scan instance. */ public Scan getScan() { - if (scan == null) scan = new Scan(); + if (this.scan == null) this.scan = new Scan(); return scan; } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 6e545662512..3ef418c4422 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -58,11 +58,11 @@ public class TableMapReduceUtil { Class outputKeyClass, Class outputValueClass, Job job) throws IOException { job.setInputFormatClass(TableInputFormat.class); - job.setMapOutputValueClass(outputValueClass); - job.setMapOutputKeyClass(outputKeyClass); + if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); + if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table); - job.getConfiguration().set(TableInputFormat.SCAN, + job.getConfiguration().set(TableInputFormat.SCAN, convertScanToString(scan)); } @@ -125,7 +125,7 @@ public class TableMapReduceUtil { Class reducer, Job job, Class partitioner) throws IOException { job.setOutputFormatClass(TableOutputFormat.class); - job.setReducerClass(reducer); + if (reducer != null) job.setReducerClass(reducer); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class);