HBASE-1684 Backup (Export/Import) contrib tool for 0.20

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@816038 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-09-17 03:25:35 +00:00
parent e50c404aca
commit c3f5e1f551
7 changed files with 274 additions and 29 deletions

View File

@ -59,6 +59,7 @@ Release 0.21.0 - Unreleased
HBASE-1835 Add more delete tests
HBASE-1574 Client and server APIs to do batch deletes
HBASE-1833 hfile.main fixes
HBASE-1684 Backup (Export/Import) contrib tool for 0.20
OPTIMIZATIONS

View File

@ -34,6 +34,8 @@ public class Driver {
ProgramDriver pgd = new ProgramDriver();
pgd.addClass(RowCounter.NAME, RowCounter.class,
"Count rows in HBase table");
pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
pgd.driver(args);
}
}

View File

@ -0,0 +1,120 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* Export an HBase table.
* Writes content to sequence files up in HDFS. Use {@link Import} to read it
* back in again.
*/
public class Export {
final static String NAME = "export";
/**
* Mapper.
*/
static class Exporter
extends TableMapper<ImmutableBytesWritable, Result> {
/**
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
public void map(ImmutableBytesWritable row, Result value,
Context context)
throws IOException {
try {
context.write(row, value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path outputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Exporter.class);
// TODO: Allow passing filter and subset of rows/columns.
TableMapReduceUtil.initTableMapperJob(tableName, new Scan(),
Exporter.class, null, null, job);
// No reducers. Just write straight to output files.
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
FileOutputFormat.setOutputPath(job, outputDir);
return job;
}
/*
* @param errorMsg Error message. Can be null.
*/
private static void usage(final String errorMsg) {
if (errorMsg != null && errorMsg.length() > 0) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: Export <tablename> <outputdir>");
}
/**
* Main entry point.
*
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
usage("Wrong number of arguments: " + otherArgs.length);
System.exit(-1);
}
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

View File

@ -0,0 +1,126 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* Import data written by {@link Export}.
*/
public class Import {
final static String NAME = "import";
/**
* Write table content out to files in hdfs.
*/
static class Importer
extends TableMapper<ImmutableBytesWritable, Put> {
/**
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
public void map(ImmutableBytesWritable row, Result value,
Context context)
throws IOException {
try {
context.write(row, resultToPut(row, value));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static Put resultToPut(ImmutableBytesWritable key, Result result)
throws IOException {
Put put = new Put(key.get());
for (KeyValue kv : result.raw()) {
put.add(kv);
}
return put;
}
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Importer.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(Importer.class);
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
return job;
}
/*
* @param errorMsg Error message. Can be null.
*/
private static void usage(final String errorMsg) {
if (errorMsg != null && errorMsg.length() > 0) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: Import <tablename> <inputdir>");
}
/**
* Main entry point.
*
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
usage("Wrong number of arguments: " + otherArgs.length);
System.exit(-1);
}
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

View File

@ -22,15 +22,14 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
@ -49,7 +48,7 @@ public class RowCounter {
extends TableMapper<ImmutableBytesWritable, Result> {
/** Counter enumeration to count the actual rows. */
public static enum Counters { ROWS }
public static enum Counters {ROWS}
/**
* Maps the data.
@ -72,7 +71,6 @@ public class RowCounter {
}
}
}
}
/**
@ -85,11 +83,12 @@ public class RowCounter {
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
Job job = new Job(conf, NAME);
String tableName = args[0];
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(RowCounter.class);
// Columns are space delimited
StringBuilder sb = new StringBuilder();
final int columnoffset = 2;
final int columnoffset = 1;
for (int i = columnoffset; i < args.length; i++) {
if (i > columnoffset) {
sb.append(" ");
@ -97,20 +96,21 @@ public class RowCounter {
sb.append(args[i]);
}
Scan scan = new Scan();
for(String columnName : sb.toString().split(" ")) {
String [] fields = columnName.split(":");
if(fields.length == 1) {
scan.addFamily(Bytes.toBytes(fields[0]));
} else {
scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
if (sb.length() > 0) {
for (String columnName :sb.toString().split(" ")) {
String [] fields = columnName.split(":");
if(fields.length == 1) {
scan.addFamily(Bytes.toBytes(fields[0]));
} else {
scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
}
}
}
}
// Second argument is the table name.
TableMapReduceUtil.initTableMapperJob(args[1], scan,
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(0);
// first argument is the output directory.
FileOutputFormat.setOutputPath(job, new Path(args[0]));
return job;
}
@ -123,10 +123,9 @@ public class RowCounter {
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 3) {
if (otherArgs.length < 1) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
System.err.println("Usage: " + NAME +
" <outputdir> <tablename> <column1> [<column2>...]");
System.err.println("Usage: RowCounter <tablename> [<column1> <column2>...]");
System.exit(-1);
}
Job job = createSubmittableJob(conf, otherArgs);

View File

@ -248,7 +248,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
if (trr == null) {
trr = new TableRecordReader();
}
Scan sc = new Scan(scan);
Scan sc = new Scan(this.scan);
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
@ -276,9 +276,6 @@ extends InputFormat<ImmutableBytesWritable, Result> {
if (table == null) {
throw new IOException("No table was provided.");
}
if (!scan.hasFamilies()) {
throw new IOException("Expecting at least one column.");
}
int realNumSplits = startKeys.length;
InputSplit[] splits = new InputSplit[realNumSplits];
int middle = startKeys.length / realNumSplits;
@ -319,7 +316,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
* @return The internal scan instance.
*/
public Scan getScan() {
if (scan == null) scan = new Scan();
if (this.scan == null) this.scan = new Scan();
return scan;
}

View File

@ -58,11 +58,11 @@ public class TableMapReduceUtil {
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, Job job) throws IOException {
job.setInputFormatClass(TableInputFormat.class);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
job.getConfiguration().set(TableInputFormat.SCAN,
job.getConfiguration().set(TableInputFormat.SCAN,
convertScanToString(scan));
}
@ -125,7 +125,7 @@ public class TableMapReduceUtil {
Class<? extends TableReducer> reducer, Job job, Class partitioner)
throws IOException {
job.setOutputFormatClass(TableOutputFormat.class);
job.setReducerClass(reducer);
if (reducer != null) job.setReducerClass(reducer);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);