From 94165dbc02294ba7576fd7d3c10c2dce97a3c098 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 30 Jun 2009 17:42:53 +0000 Subject: [PATCH] HBASE-1385 Revamp TableInputFormat, needs updating to match hadoop 0.20.x AND remove bit where we can make < maps than regions git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@789846 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../org/apache/hadoop/hbase/client/Scan.java | 106 +++++- .../hbase/mapreduce/BuildTableIndex.java | 115 +++--- .../hbase/mapreduce/GroupingTableMap.java | 158 -------- .../hbase/mapreduce/HRegionPartitioner.java | 82 +++-- .../hbase/mapreduce/IdentityTableMap.java | 74 ---- .../hbase/mapreduce/IdentityTableReduce.java | 59 --- .../hbase/mapreduce/IndexConfiguration.java | 3 +- .../hbase/mapreduce/IndexOutputFormat.java | 100 ++---- .../hbase/mapreduce/IndexTableReduce.java | 109 ------ .../mapreduce/LuceneDocumentWrapper.java | 2 + .../hadoop/hbase/mapreduce/RowCounter.java | 126 ++++--- .../hbase/mapreduce/TableInputFormat.java | 82 +++-- .../hbase/mapreduce/TableInputFormatBase.java | 338 +++++++++--------- .../hadoop/hbase/mapreduce/TableMap.java | 38 -- .../hbase/mapreduce/TableMapReduceUtil.java | 138 +++---- .../hbase/mapreduce/TableOutputFormat.java | 90 +++-- .../hadoop/hbase/mapreduce/TableReduce.java | 38 -- .../hadoop/hbase/mapreduce/TableSplit.java | 140 ++++++-- .../hadoop/hbase/mapreduce/package-info.java | 8 +- 20 files changed, 744 insertions(+), 1065 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 574fcc9a658..90c9b0c6db5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -421,6 +421,9 @@ Release 0.20.0 - Unreleased HBASE-1587 Update ganglia config and doc to account for ganglia 3.1 and hadoop-4675 HBASE-1589 Up zk maxClientCnxns from default of 10 to 20 or 30 or so + HBASE-1385 Revamp TableInputFormat, needs updating to match hadoop 0.20.x + AND remove bit where we can make < maps than regions + (Lars George via Stack) OPTIMIZATIONS HBASE-1412 Change values for delete column and column family in KeyValue diff --git a/src/java/org/apache/hadoop/hbase/client/Scan.java b/src/java/org/apache/hadoop/hbase/client/Scan.java index 398eadc7922..e3770403045 100644 --- a/src/java/org/apache/hadoop/hbase/client/Scan.java +++ b/src/java/org/apache/hadoop/hbase/client/Scan.java @@ -108,6 +108,33 @@ public class Scan implements Writable { this.stopRow = stopRow; } + /** + * Creates a new instance of this class while copying all values. + * + * @param scan The scan instance to copy from. + * @throws IOException When copying the values fails. + */ + public Scan(Scan scan) throws IOException { + startRow = scan.getStartRow(); + stopRow = scan.getStopRow(); + maxVersions = scan.getMaxVersions(); + filter = scan.getFilter(); // clone? + oldFilter = scan.getOldFilter(); // clone? + TimeRange ctr = scan.getTimeRange(); + tr = new TimeRange(ctr.getMin(), ctr.getMax()); + Map> fams = scan.getFamilyMap(); + for (byte[] fam : fams.keySet()) { + NavigableSet cols = fams.get(fam); + if (cols != null && cols.size() > 0) { + for (byte[] col : cols) { + addColumn(fam, col); + } + } else { + addFamily(fam); + } + } + } + /** * Get all columns from the specified family. *

@@ -137,26 +164,89 @@ public class Scan implements Writable { return this; } + + /** + * Parses a combined family and qualifier and adds either both or just the + * family in case there is not qualifier. This assumes the older colon + * divided notation, e.g. "data:contents" or "meta:". + *

+ * Note: It will through an error when the colon is missing. + * + * @param familyAndQualifier + * @return A reference to this instance. + * @throws IllegalArgumentException When the colon is missing. + */ + public Scan addColumn(byte[] familyAndQualifier) { + byte[][] fq = KeyValue.parseColumn(familyAndQualifier); + if (fq[1].length > 0) { + addColumn(fq[0], fq[1]); + } else { + addFamily(fq[0]); + } + return this; + } /** * Adds an array of columns specified the old format, family:qualifier. *

* Overrides previous calls to addFamily for any families in the input. + * * @param columns array of columns, formatted as

family:qualifier
*/ public Scan addColumns(byte [][] columns) { for(int i=0; i> e : + familyMap.entrySet()) { + byte[] fam = e.getKey(); + if (cols.length() > 0) cols += " "; + NavigableSet quals = e.getValue(); + // check if this family has qualifiers + if (quals != null && quals.size() > 0) { + String cs = ""; + for (byte[] qual : quals) { + if (cs.length() > 0) cs += " "; + // encode values to make parsing easier later + cs += Bytes.toStringBinary(fam) + ":" + Bytes.toStringBinary(qual); + } + cols += cs; + } else { + // only add the family but with old style delimiter + cols += Bytes.toStringBinary(fam) + ":"; + } + } + return cols; + } /** * Get versions of columns only within the specified timestamp range, diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java b/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java index 6aa2284ba22..de39a158802 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java @@ -27,9 +27,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; /** * Example table column indexing class. Runs a mapreduce job to index @@ -51,32 +52,36 @@ import org.apache.hadoop.mapred.JobConf; * */ public class BuildTableIndex { + private static final String USAGE = "Usage: BuildTableIndex " + - "-m -r \n -indexConf " + - "-indexDir \n -table -columns " + + "-r -indexConf \n" + + "-indexDir -table \n -columns " + "[ ...]"; + /** + * Prints the usage message and exists the program. + * + * @param message The message to print first. + */ private static void printUsage(String message) { System.err.println(message); System.err.println(USAGE); System.exit(-1); } - /** default constructor */ - public BuildTableIndex() { - super(); - } - /** - * @param args - * @throws IOException + * Creates a new job. + * @param conf + * + * @param args The command line arguments. + * @throws IOException When reading the configuration fails. */ - public void run(String[] args) throws IOException { + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { if (args.length < 6) { printUsage("Too few arguments"); } - int numMapTasks = 1; int numReduceTasks = 1; String iconfFile = null; String indexDir = null; @@ -85,9 +90,7 @@ public class BuildTableIndex { // parse args for (int i = 0; i < args.length - 1; i++) { - if ("-m".equals(args[i])) { - numMapTasks = Integer.parseInt(args[++i]); - } else if ("-r".equals(args[i])) { + if ("-r".equals(args[i])) { numReduceTasks = Integer.parseInt(args[++i]); } else if ("-indexConf".equals(args[i])) { iconfFile = args[++i]; @@ -111,7 +114,6 @@ public class BuildTableIndex { "be specified"); } - Configuration conf = new HBaseConfiguration(); if (iconfFile != null) { // set index configuration content from a file String content = readContent(iconfFile); @@ -121,52 +123,31 @@ public class BuildTableIndex { conf.set("hbase.index.conf", content); } - if (columnNames != null) { - JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir, - tableName, columnNames.toString()); - JobClient.runJob(jobConf); - } + Job job = new Job(conf, "build index for table " + tableName); + // number of indexes to partition into + job.setNumReduceTasks(numReduceTasks); + Scan scan = new Scan(); + scan.addColumns(columnNames.toString()); + // use identity map (a waste, but just as an example) + IdentityTableMapper.initJob(tableName, scan, + IdentityTableMapper.class, job); + // use IndexTableReduce to build a Lucene index + job.setReducerClass(IndexTableReducer.class); + FileOutputFormat.setOutputPath(job, new Path(indexDir)); + job.setOutputFormatClass(IndexOutputFormat.class); + return job; } /** - * @param conf - * @param numMapTasks - * @param numReduceTasks - * @param indexDir - * @param tableName - * @param columnNames - * @return JobConf - */ - public JobConf createJob(Configuration conf, int numMapTasks, - int numReduceTasks, String indexDir, String tableName, - String columnNames) { - JobConf jobConf = new JobConf(conf, BuildTableIndex.class); - jobConf.setJobName("build index for table " + tableName); - jobConf.setNumMapTasks(numMapTasks); - // number of indexes to partition into - jobConf.setNumReduceTasks(numReduceTasks); - - // use identity map (a waste, but just as an example) - IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class, - jobConf); - - // use IndexTableReduce to build a Lucene index - jobConf.setReducerClass(IndexTableReduce.class); - FileOutputFormat.setOutputPath(jobConf, new Path(indexDir)); - jobConf.setOutputFormat(IndexOutputFormat.class); - - return jobConf; - } - - /* - * Read xml file of indexing configurations. The xml format is similar to + * Reads xml file of indexing configurations. The xml format is similar to * hbase-default.xml and hadoop-default.xml. For an example configuration, - * see the createIndexConfContent method in TestTableIndex - * @param fileName File to read. - * @return XML configuration read from file - * @throws IOException + * see the createIndexConfContent method in TestTableIndex. + * + * @param fileName The file to read. + * @return XML configuration read from file. + * @throws IOException When the XML is broken. */ - private String readContent(String fileName) throws IOException { + private static String readContent(String fileName) throws IOException { File file = new File(fileName); int length = (int) file.length(); if (length == 0) { @@ -195,11 +176,17 @@ public class BuildTableIndex { } /** - * @param args - * @throws IOException + * The main entry point. + * + * @param args The command line arguments. + * @throws Exception When running the job fails. */ - public static void main(String[] args) throws IOException { - BuildTableIndex build = new BuildTableIndex(); - build.run(args); + public static void main(String[] args) throws Exception { + HBaseConfiguration conf = new HBaseConfiguration(); + String[] otherArgs = + new GenericOptionsParser(conf, args).getRemainingArgs(); + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); } + } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java b/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java index 6916b10c4ae..e69de29bb2d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java @@ -1,158 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -/** - * Extract grouping columns from input record - */ -public class GroupingTableMap -extends MapReduceBase -implements TableMap { - - /** - * JobConf parameter to specify the columns used to produce the key passed to - * collect from the map phase - */ - public static final String GROUP_COLUMNS = - "hbase.mapred.groupingtablemap.columns"; - - protected byte [][] m_columns; - - /** - * Use this before submitting a TableMap job. It will appropriately set up the - * JobConf. - * - * @param table table to be processed - * @param columns space separated list of columns to fetch - * @param groupColumns space separated list of columns used to form the key - * used in collect - * @param mapper map class - * @param job job configuration object - */ - @SuppressWarnings("unchecked") - public static void initJob(String table, String columns, String groupColumns, - Class mapper, JobConf job) { - - TableMapReduceUtil.initTableMapJob(table, columns, mapper, - ImmutableBytesWritable.class, Result.class, job); - job.set(GROUP_COLUMNS, groupColumns); - } - - @Override - public void configure(JobConf job) { - super.configure(job); - String[] cols = job.get(GROUP_COLUMNS, "").split(" "); - m_columns = new byte[cols.length][]; - for(int i = 0; i < cols.length; i++) { - m_columns[i] = Bytes.toBytes(cols[i]); - } - } - - /** - * Extract the grouping columns from value to construct a new key. - * - * Pass the new key and value to reduce. - * If any of the grouping columns are not found in the value, the record is skipped. - * @param key - * @param value - * @param output - * @param reporter - * @throws IOException - */ - public void map(ImmutableBytesWritable key, Result value, - OutputCollector output, - Reporter reporter) throws IOException { - - byte[][] keyVals = extractKeyValues(value); - if(keyVals != null) { - ImmutableBytesWritable tKey = createGroupKey(keyVals); - output.collect(tKey, value); - } - } - - /** - * Extract columns values from the current record. This method returns - * null if any of the columns are not found. - * - * Override this method if you want to deal with nulls differently. - * - * @param r - * @return array of byte values - */ - protected byte[][] extractKeyValues(Result r) { - byte[][] keyVals = null; - ArrayList foundList = new ArrayList(); - int numCols = m_columns.length; - if (numCols > 0) { - for (KeyValue value: r.list()) { - byte [] column = value.getColumn(); - for (int i = 0; i < numCols; i++) { - if (Bytes.equals(column, m_columns[i])) { - foundList.add(value.getValue()); - break; - } - } - } - if(foundList.size() == numCols) { - keyVals = foundList.toArray(new byte[numCols][]); - } - } - return keyVals; - } - - /** - * Create a key by concatenating multiple column values. - * Override this function in order to produce different types of keys. - * - * @param vals - * @return key generated by concatenating multiple column values - */ - protected ImmutableBytesWritable createGroupKey(byte[][] vals) { - if(vals == null) { - return null; - } - StringBuilder sb = new StringBuilder(); - for(int i = 0; i < vals.length; i++) { - if(i > 0) { - sb.append(" "); - } - try { - sb.append(new String(vals[i], HConstants.UTF8_ENCODING)); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - return new ImmutableBytesWritable(Bytes.toBytes(sb.toString())); - } -} diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java index 81a5694af77..2808d67854e 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java @@ -23,44 +23,47 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.mapreduce.Partitioner; /** * This is used to partition the output keys into groups of keys. * Keys are grouped according to the regions that currently exist * so that each reducer fills a single region so load is distributed. * - * @param - * @param + * @param The type of the key. + * @param The type of the value. */ -public class HRegionPartitioner -implements Partitioner { +public class HRegionPartitioner +extends Partitioner +implements Configurable { + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + private Configuration conf = null; private HTable table; private byte[][] startKeys; - public void configure(JobConf job) { - try { - this.table = new HTable(new HBaseConfiguration(job), - job.get(TableOutputFormat.OUTPUT_TABLE)); - } catch (IOException e) { - LOG.error(e); - } - - try { - this.startKeys = this.table.getStartKeys(); - } catch (IOException e) { - LOG.error(e); - } - } - + /** + * Gets the partition number for a given key (hence record) given the total + * number of partitions i.e. number of reduce-tasks for the job. + * + *

Typically a hash function on a all or a subset of the key.

+ * + * @param key The key to be partitioned. + * @param value The entry value. + * @param numPartitions The total number of partitions. + * @return The partition number for the key. + * @see org.apache.hadoop.mapreduce.Partitioner#getPartition( + * java.lang.Object, java.lang.Object, int) + */ + @Override public int getPartition(ImmutableBytesWritable key, - V2 value, int numPartitions) { + VALUE value, int numPartitions) { byte[] region = null; // Only one region return 0 if (this.startKeys.length == 1){ @@ -86,4 +89,39 @@ implements Partitioner { // if above fails to find start key that match we need to return something return 0; } + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to determine the start keys for the + * given table. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + try { + this.table = new HTable(new HBaseConfiguration(conf), + configuration.get(TableOutputFormat.OUTPUT_TABLE)); + } catch (IOException e) { + LOG.error(e); + } + try { + this.startKeys = this.table.getStartKeys(); + } catch (IOException e) { + LOG.error(e); + } + } } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java index dc5de1948bb..e69de29bb2d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java @@ -1,74 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -/** - * Pass the given key and record as-is to reduce - */ -public class IdentityTableMap -extends MapReduceBase -implements TableMap { - - /** constructor */ - public IdentityTableMap() { - super(); - } - - /** - * Use this before submitting a TableMap job. It will - * appropriately set up the JobConf. - * - * @param table table name - * @param columns columns to scan - * @param mapper mapper class - * @param job job configuration - */ - @SuppressWarnings("unchecked") - public static void initJob(String table, String columns, - Class mapper, JobConf job) { - TableMapReduceUtil.initTableMapJob(table, columns, mapper, - ImmutableBytesWritable.class, Result.class, job); - } - - /** - * Pass the key, value to reduce - * @param key - * @param value - * @param output - * @param reporter - * @throws IOException - */ - public void map(ImmutableBytesWritable key, Result value, - OutputCollector output, - Reporter reporter) throws IOException { - - // convert - output.collect(key, value); - } -} diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java index 01a31748e8e..e69de29bb2d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java @@ -1,59 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -/** - * Write to table each key, record pair - */ -public class IdentityTableReduce -extends MapReduceBase -implements TableReduce { - @SuppressWarnings("unused") - private static final Log LOG = - LogFactory.getLog(IdentityTableReduce.class.getName()); - - /** - * No aggregation, output pairs of (key, record) - * @param key - * @param values - * @param output - * @param reporter - * @throws IOException - */ - public void reduce(ImmutableBytesWritable key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - - while(values.hasNext()) { - output.collect(key, values.next()); - } - } -} diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java b/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java index 60e3646f8f4..9d5cb9d1ea7 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java @@ -44,9 +44,10 @@ import org.w3c.dom.NodeList; import org.w3c.dom.Text; /** - * Configuration parameters for building a Lucene index + * Configuration parameters for building a Lucene index. */ public class IndexConfiguration extends Configuration { + private static final Log LOG = LogFactory.getLog(IndexConfiguration.class); static final String HBASE_COLUMN_NAME = "hbase.column.name"; diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java index b11da9c91f3..c43a71dc155 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java @@ -27,13 +27,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.Similarity; @@ -41,29 +38,40 @@ import org.apache.lucene.search.Similarity; * Create a local index, unwrap Lucene documents created by reduce, add them to * the index, and copy the index to the destination. */ -public class IndexOutputFormat extends - FileOutputFormat { +public class IndexOutputFormat +extends FileOutputFormat { + static final Log LOG = LogFactory.getLog(IndexOutputFormat.class); + /** Random generator. */ private Random random = new Random(); + /** + * Returns the record writer. + * + * @param context The current task context. + * @return The record writer. + * @throws IOException When there is an issue with the writer. + * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) + */ @Override public RecordWriter - getRecordWriter(final FileSystem fs, JobConf job, String name, - final Progressable progress) + getRecordWriter(TaskAttemptContext context) throws IOException { - final Path perm = new Path(FileOutputFormat.getOutputPath(job), name); - final Path temp = job.getLocalPath("index/_" - + Integer.toString(random.nextInt())); + final Path perm = new Path(FileOutputFormat.getOutputPath(context), + FileOutputFormat.getUniqueFile(context, "part", "")); + // null for "dirsProp" means no predefined directories + final Path temp = context.getConfiguration().getLocalPath( + "mapred.local.dir", "index/_" + Integer.toString(random.nextInt())); LOG.info("To index into " + perm); - + FileSystem fs = FileSystem.get(context.getConfiguration()); // delete old, if any fs.delete(perm, true); final IndexConfiguration indexConf = new IndexConfiguration(); - String content = job.get("hbase.index.conf"); + String content = context.getConfiguration().get("hbase.index.conf"); if (content != null) { indexConf.addFromXML(content); } @@ -99,65 +107,7 @@ public class IndexOutputFormat extends } } writer.setUseCompoundFile(indexConf.isUseCompoundFile()); - - return new RecordWriter() { - boolean closed; - private long docCount = 0; - - public void write(ImmutableBytesWritable key, - LuceneDocumentWrapper value) - throws IOException { - // unwrap and index doc - Document doc = value.get(); - writer.addDocument(doc); - docCount++; - progress.progress(); - } - - public void close(final Reporter reporter) throws IOException { - // spawn a thread to give progress heartbeats - Thread prog = new Thread() { - @Override - public void run() { - while (!closed) { - try { - reporter.setStatus("closing"); - Thread.sleep(1000); - } catch (InterruptedException e) { - continue; - } catch (Throwable e) { - return; - } - } - } - }; - - try { - prog.start(); - - // optimize index - if (indexConf.doOptimize()) { - if (LOG.isInfoEnabled()) { - LOG.info("Optimizing index."); - } - writer.optimize(); - } - - // close index - writer.close(); - if (LOG.isInfoEnabled()) { - LOG.info("Done indexing " + docCount + " docs."); - } - - // copy to perm destination in dfs - fs.completeLocalOutput(perm, temp); - if (LOG.isInfoEnabled()) { - LOG.info("Copy done."); - } - } finally { - closed = true; - } - } - }; + return new IndexRecordWriter(context, fs, writer, indexConf, perm, temp); } + } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java b/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java index 3b1ec1bb967..e69de29bb2d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java @@ -1,109 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Construct a Lucene document per row, which is consumed by IndexOutputFormat - * to build a Lucene index - */ -public class IndexTableReduce extends MapReduceBase implements - Reducer { - private static final Log LOG = LogFactory.getLog(IndexTableReduce.class); - private IndexConfiguration indexConf; - - @Override - public void configure(JobConf job) { - super.configure(job); - indexConf = new IndexConfiguration(); - String content = job.get("hbase.index.conf"); - if (content != null) { - indexConf.addFromXML(content); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Index conf: " + indexConf); - } - } - - @Override - public void close() throws IOException { - super.close(); - } - - public void reduce(ImmutableBytesWritable key, Iterator values, - OutputCollector output, - Reporter reporter) - throws IOException { - if (!values.hasNext()) { - return; - } - - Document doc = new Document(); - - // index and store row key, row key already UTF-8 encoded - Field keyField = new Field(indexConf.getRowkeyName(), - Bytes.toString(key.get(), key.getOffset(), key.getLength()), - Field.Store.YES, Field.Index.UN_TOKENIZED); - keyField.setOmitNorms(true); - doc.add(keyField); - - while (values.hasNext()) { - Result value = values.next(); - - // each column (name-value pair) is a field (name-value pair) - for (KeyValue kv: value.list()) { - // name is already UTF-8 encoded - String column = Bytes.toString(kv.getColumn()); - byte[] columnValue = kv.getValue(); - Field.Store store = indexConf.isStore(column)? - Field.Store.YES: Field.Store.NO; - Field.Index index = indexConf.isIndex(column)? - (indexConf.isTokenize(column)? - Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED): - Field.Index.NO; - - // UTF-8 encode value - Field field = new Field(column, Bytes.toString(columnValue), - store, index); - field.setBoost(indexConf.getBoost(column)); - field.setOmitNorms(indexConf.isOmitNorms(column)); - - doc.add(field); - } - } - output.collect(key, new LuceneDocumentWrapper(doc)); - } -} diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java b/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java index 5bd9b3ec9bd..6aa7cdd182d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java @@ -29,6 +29,8 @@ import org.apache.lucene.document.Document; * It doesn't really serialize/deserialize a lucene document. */ public class LuceneDocumentWrapper implements Writable { + + /** The document to add to the index. */ protected Document doc; /** diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 9ca94ad6c81..b4fae725c82 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -21,73 +21,71 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.IdentityReducer; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; /** - * A job with a map to count rows. - * Map outputs table rows IF the input row has columns that have content. - * Uses an {@link IdentityReducer} + * A job with a just a map phase to count rows. Map outputs table rows IF the + * input row has columns that have content. */ -public class RowCounter extends Configured implements Tool { - // Name of this 'program' +public class RowCounter { + + /** Name of this 'program'. */ static final String NAME = "rowcounter"; /** * Mapper that runs the count. */ static class RowCounterMapper - implements TableMap { - private static enum Counters {ROWS} + extends TableMapper { + + /** Counter enumeration to count the actual rows. */ + private static enum Counters { ROWS } + /** + * Maps the data. + * + * @param row The current table row key. + * @param values The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + @Override public void map(ImmutableBytesWritable row, Result values, - OutputCollector output, - Reporter reporter) + Context context) throws IOException { - boolean content = !values.list().isEmpty(); for (KeyValue value: values.list()) { if (value.getValue().length > 0) { - content = true; + context.getCounter(Counters.ROWS).increment(1); break; } } - if (!content) { - // Don't count rows that are all empty values. - return; - } - // Give out same value every time. We're only interested in the row/key - reporter.incrCounter(Counters.ROWS, 1); } - public void configure(JobConf jc) { - // Nothing to do. - } - - public void close() throws IOException { - // Nothing to do. - } } /** - * @param args - * @return the JobConf - * @throws IOException + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. */ - public JobConf createSubmittableJob(String[] args) throws IOException { - JobConf c = new JobConf(getConf(), getClass()); - c.setJobName(NAME); + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + Job job = new Job(conf, NAME); + job.setJarByClass(RowCounter.class); // Columns are space delimited StringBuilder sb = new StringBuilder(); final int columnoffset = 2; @@ -97,38 +95,34 @@ public class RowCounter extends Configured implements Tool { } sb.append(args[i]); } + Scan scan = new Scan(); + scan.addColumns(sb.toString()); // Second argument is the table name. - TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), - RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c); - c.setNumReduceTasks(0); - // First arg is the output directory. - FileOutputFormat.setOutputPath(c, new Path(args[0])); - return c; - } - - static int printUsage() { - System.out.println(NAME + - " [...]"); - return -1; - } - - public int run(final String[] args) throws Exception { - // Make sure there are at least 3 parameters - if (args.length < 3) { - System.err.println("ERROR: Wrong number of parameters: " + args.length); - return printUsage(); - } - JobClient.runJob(createSubmittableJob(args)); - return 0; + TableMapReduceUtil.initTableMapperJob(args[1], scan, + RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); + job.setNumReduceTasks(0); + // first argument is the output directory. + FileOutputFormat.setOutputPath(job, new Path(args[0])); + return job; } /** - * @param args - * @throws Exception + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { - HBaseConfiguration c = new HBaseConfiguration(); - int errCode = ToolRunner.run(c, new RowCounter(), args); - System.exit(errCode); + HBaseConfiguration conf = new HBaseConfiguration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 3) { + System.err.println("ERROR: Wrong number of parameters: " + args.length); + System.err.println("Usage: " + NAME + + " [...]"); + System.exit(-1); + } + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); } + } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 790a2cf7506..940851a887e 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -23,60 +23,64 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.util.StringUtils; /** * Convert HBase tabular data into a format that is consumable by Map/Reduce. */ -public class TableInputFormat extends TableInputFormatBase implements - JobConfigurable { +public class TableInputFormat extends TableInputFormatBase +implements Configurable { + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + + /** Job parameter that specifies the output table. */ + public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; + /** Space delimited list of columns. */ + public static final String SCAN = "hbase.mapreduce.scan"; + + /** The configuration. */ + private Configuration conf = null; /** - * space delimited list of columns + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() */ - public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; + @Override + public Configuration getConf() { + return conf; + } - public void configure(JobConf job) { - Path[] tableNames = FileInputFormat.getInputPaths(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte [][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); + /** + * Sets the configuration. This is used to set the details for the table to + * be scanned. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + String tableName = conf.get(INPUT_TABLE); try { - setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName())); + setHTable(new HTable(new HBaseConfiguration(conf), tableName)); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } + Scan scan = null; + try { + scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); + } catch (IOException e) { + LOG.error("An error occurred.", e); + } + setScan(scan); } - - public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - Path [] tableNames = FileInputFormat.getInputPaths(job); - if (tableNames == null || tableNames.length > 1) { - throw new IOException("expecting one table name"); - } - - // connected to table? - if (getHTable() == null) { - throw new IOException("could not connect to table '" + - tableNames[0].getName() + "'"); - } - - // expecting at least one column - String colArg = job.get(COLUMN_LIST); - if (colArg == null || colArg.length() == 0) { - throw new IOException("expecting at least one column"); - } - } + } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index dcd86028314..0f6a30fc41e 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.Arrays; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,23 +31,19 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.filter.RowFilterInterface; -import org.apache.hadoop.hbase.filter.StopRowFilter; -import org.apache.hadoop.hbase.filter.WhileMatchRowFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.StringUtils; /** - * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a - * byte[] of input columns and optionally a {@link RowFilterInterface}. - * Subclasses may use other TableRecordReader implementations. + * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an + * {@link Scan} instance that defines the input columns etc. Subclasses may use + * other TableRecordReader implementations. *

* An example of a subclass: *

@@ -72,155 +68,146 @@ import org.apache.hadoop.util.StringUtils;
  *  }
  * 
*/ - public abstract class TableInputFormatBase -implements InputFormat { +extends InputFormat { + final Log LOG = LogFactory.getLog(TableInputFormatBase.class); - private byte [][] inputColumns; - private HTable table; - private TableRecordReader tableRecordReader; - private RowFilterInterface rowFilter; + + /** Holds the details for the internal scanner. */ + private Scan scan = null; + /** The table to scan. */ + private HTable table = null; + /** The reader scanning the table, can be a custom one. */ + private TableRecordReader tableRecordReader = null; /** - * Iterate over an HBase table data, return (Text, RowResult) pairs + * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) + * pairs. */ protected class TableRecordReader - implements RecordReader { - private byte [] startRow; - private byte [] endRow; - private byte [] lastRow; - private RowFilterInterface trrRowFilter; - private ResultScanner scanner; - private HTable htable; - private byte [][] trrInputColumns; + extends RecordReader { + + private ResultScanner scanner = null; + private Scan scan = null; + private HTable htable = null; + private byte[] lastRow = null; + private ImmutableBytesWritable key = null; + private Result value = null; /** * Restart from survivable exceptions by creating a new scanner. * - * @param firstRow - * @throws IOException + * @param firstRow The first row to start at. + * @throws IOException When restarting fails. */ public void restart(byte[] firstRow) throws IOException { - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - final Set rowFiltersSet = - new HashSet(); - rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow))); - rowFiltersSet.add(trrRowFilter); - Scan scan = new Scan(startRow); - scan.addColumns(trrInputColumns); -// scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL, -// rowFiltersSet)); - this.scanner = this.htable.getScanner(scan); - } else { - Scan scan = new Scan(firstRow, endRow); - scan.addColumns(trrInputColumns); - this.scanner = this.htable.getScanner(scan); - } - } else { - Scan scan = new Scan(firstRow); - scan.addColumns(trrInputColumns); -// scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - } + Scan newScan = new Scan(scan); + newScan.setStartRow(firstRow); + this.scanner = this.htable.getScanner(newScan); } /** * Build the scanner. Not done in constructor to allow for extension. * - * @throws IOException + * @throws IOException When restarting the scan fails. */ public void init() throws IOException { - restart(startRow); + restart(scan.getStartRow()); } /** - * @param htable the {@link HTable} to scan. + * Sets the HBase table. + * + * @param htable The {@link HTable} to scan. */ public void setHTable(HTable htable) { this.htable = htable; } /** - * @param inputColumns the columns to be placed in {@link Result}. + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. */ - public void setInputColumns(final byte [][] inputColumns) { - this.trrInputColumns = inputColumns; + public void setScan(Scan scan) { + this.scan = scan; } /** - * @param startRow the first row in the split + * Closes the split. + * + * @see org.apache.hadoop.mapreduce.RecordReader#close() */ - public void setStartRow(final byte [] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow the last row in the split - */ - public void setEndRow(final byte [] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter the {@link RowFilterInterface} to be used. - */ - public void setRowFilter(RowFilterInterface rowFilter) { - this.trrRowFilter = rowFilter; - } - + @Override public void close() { this.scanner.close(); } /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - public Result createValue() { - return new Result(); - } - - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key HStoreKey as input key. - * @param value MapWritable as input value - * @return true if there was more data + * Returns the current key. + * + * @return The current key. * @throws IOException + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() */ - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return key; + } + + /** + * Returns the current value. + * + * @return The current value. + * @throws IOException When the value is faulty. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() + */ + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return value; + } + + /** + * Initializes the reader. + * + * @param inputsplit The split to work with. + * @param context The current task context. + * @throws IOException When setting up the reader fails. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#initialize( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public void initialize(InputSplit inputsplit, + TaskAttemptContext context) throws IOException, + InterruptedException { + } + + /** + * Positions the record reader to the next record. + * + * @return true if there was another record. + * @throws IOException When reading the record failed. + * @throws InterruptedException When the job was aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue() + */ + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) key = new ImmutableBytesWritable(); + if (value == null) value = new Result(); Result result; try { result = this.scanner.next(); } catch (UnknownScannerException e) { LOG.debug("recovered from " + StringUtils.stringifyException(e)); restart(lastRow); - this.scanner.next(); // skip presumed already mapped row - result = this.scanner.next(); + scanner.next(); // skip presumed already mapped row + result = scanner.next(); } - if (result != null && result.size() > 0) { key.set(result.getRow()); lastRow = key.get(); @@ -229,17 +216,35 @@ implements InputFormat { } return false; } + + /** + * The current progress of the record reader through its data. + * + * @return A number between 0.0 and 1.0, the fraction of the data read. + * @see org.apache.hadoop.mapreduce.RecordReader#getProgress() + */ + @Override + public float getProgress() { + // Depends on the total number of tuples + return 0; + } } /** * Builds a TableRecordReader. If no TableRecordReader was provided, uses * the default. - * - * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, - * JobConf, Reporter) + * + * @param split The split to work with. + * @param context The current context. + * @return The newly created record reader. + * @throws IOException When creating the reader fails. + * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) */ - public RecordReader getRecordReader( - InputSplit split, JobConf job, Reporter reporter) + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException { TableSplit tSplit = (TableSplit) split; TableRecordReader trr = this.tableRecordReader; @@ -247,45 +252,38 @@ implements InputFormat { if (trr == null) { trr = new TableRecordReader(); } - trr.setStartRow(tSplit.getStartRow()); - trr.setEndRow(tSplit.getEndRow()); - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); + Scan sc = new Scan(scan); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setHTable(table); trr.init(); return trr; } /** - * Calculates the splits that will serve as input for the map tasks. - *
    - * Splits are created in number equal to the smallest between numSplits and - * the number of {@link HRegion}s in the table. If the number of splits is - * smaller than the number of {@link HRegion}s then splits are spanned across - * multiple {@link HRegion}s and are grouped the most evenly possible. In the - * case splits are uneven the bigger splits are placed first in the - * {@link InputSplit} array. + * Calculates the splits that will serve as input for the map tasks. The + * number of splits matches the number of regions in a table. * - * @param job the map task {@link JobConf} - * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). - * - * @return the input splits - * - * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) */ - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - byte [][] startKeys = this.table.getStartKeys(); + @Override + public List getSplits(JobContext context) throws IOException { + byte [][] startKeys = table.getStartKeys(); if (startKeys == null || startKeys.length == 0) { - throw new IOException("Expecting at least one region"); + throw new IOException("Expecting at least one region."); } - if (this.table == null) { - throw new IOException("No table was provided"); + if (table == null) { + throw new IOException("No table was provided."); } - if (this.inputColumns == null || this.inputColumns.length == 0) { - throw new IOException("Expecting at least one column"); + if (!scan.hasFamilies()) { + throw new IOException("Expecting at least one column."); } - int realNumSplits = numSplits > startKeys.length? startKeys.length: - numSplits; + int realNumSplits = startKeys.length; InputSplit[] splits = new InputSplit[realNumSplits]; int middle = startKeys.length / realNumSplits; int startPos = 0; @@ -300,14 +298,7 @@ implements InputFormat { LOG.info("split: " + i + "->" + splits[i]); startPos = lastPos; } - return splits; - } - - /** - * @param inputColumns to be passed in {@link Result} to the map task. - */ - protected void setInputColumns(byte [][] inputColumns) { - this.inputColumns = inputColumns; + return Arrays.asList(splits); } /** @@ -320,28 +311,39 @@ implements InputFormat { /** * Allows subclasses to set the {@link HTable}. * - * @param table to get the data from + * @param table The table to get the data from. */ protected void setHTable(HTable table) { this.table = table; } + /** + * Gets the scan defining the actual details like columns etc. + * + * @return The internal scan instance. + */ + public Scan getScan() { + if (scan == null) scan = new Scan(); + return scan; + } + + /** + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. + */ + public void setScan(Scan scan) { + this.scan = scan; + } + /** * Allows subclasses to set the {@link TableRecordReader}. * - * @param tableRecordReader - * to provide other {@link TableRecordReader} implementations. + * @param tableRecordReader A different {@link TableRecordReader} + * implementation. */ protected void setTableRecordReader(TableRecordReader tableRecordReader) { this.tableRecordReader = tableRecordReader; } - /** - * Allows subclasses to set the {@link RowFilterInterface} to be used. - * - * @param rowFilter - */ - protected void setRowFilter(RowFilterInterface rowFilter) { - this.rowFilter = rowFilter; - } } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java index cabb60a5b70..e69de29bb2d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java @@ -1,38 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.Mapper; - -/** - * Scan an HBase table to sort by a specified sort column. - * If the column does not exist, the record is not passed to Reduce. - * - * @param WritableComparable key class - * @param Writable value class - */ -public interface TableMap, V extends Writable> -extends Mapper { - -} diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 306b349d421..6e545662512 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -19,45 +19,80 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; /** - * Utility for {@link TableMap} and {@link TableReduce} + * Utility for {@link TableMapper} and {@link TableReducer} */ @SuppressWarnings("unchecked") public class TableMapReduceUtil { /** - * Use this before submitting a TableMap job. It will - * appropriately set up the JobConf. + * Use this before submitting a TableMap job. It will appropriately set up + * the job. * * @param table The table name to read from. - * @param columns The columns to scan. + * @param scan The scan instance with the columns, time range etc. * @param mapper The mapper class to use. * @param outputKeyClass The class of the output key. * @param outputValueClass The class of the output value. - * @param job The current job configuration to adjust. + * @param job The current job to adjust. + * @throws IOException When setting up the details fails. */ - public static void initTableMapJob(String table, String columns, - Class mapper, - Class outputKeyClass, - Class outputValueClass, JobConf job) { - - job.setInputFormat(TableInputFormat.class); + public static void initTableMapperJob(String table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job) throws IOException { + job.setInputFormatClass(TableInputFormat.class); job.setMapOutputValueClass(outputValueClass); job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); - FileInputFormat.addInputPaths(job, table); - job.set(TableInputFormat.COLUMN_LIST, columns); + job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table); + job.getConfiguration().set(TableInputFormat.SCAN, + convertScanToString(scan)); + } + + /** + * Writes the given scan into a Base64 encoded string. + * + * @param scan The scan to write out. + * @return The scan saved in a Base64 encoded string. + * @throws IOException When writing the scan fails. + */ + static String convertScanToString(Scan scan) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out); + scan.write(dos); + return Base64.encodeBytes(out.toByteArray()); + } + + /** + * Converts the given Base64 string back into a Scan instance. + * + * @param base64 The scan details. + * @return The newly created Scan instance. + * @throws IOException When reading the scan instance fails. + */ + static Scan convertStringToScan(String base64) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64)); + DataInputStream dis = new DataInputStream(bis); + Scan scan = new Scan(); + scan.readFields(dis); + return scan; } /** @@ -66,13 +101,13 @@ public class TableMapReduceUtil { * * @param table The output table. * @param reducer The reducer class to use. - * @param job The current job configuration to adjust. + * @param job The current job to adjust. * @throws IOException When determining the region count fails. */ - public static void initTableReduceJob(String table, - Class reducer, JobConf job) + public static void initTableReducerJob(String table, + Class reducer, Job job) throws IOException { - initTableReduceJob(table, reducer, job, null); + initTableReducerJob(table, reducer, job, null); } /** @@ -81,22 +116,23 @@ public class TableMapReduceUtil { * * @param table The output table. * @param reducer The reducer class to use. - * @param job The current job configuration to adjust. + * @param job The current job to adjust. * @param partitioner Partitioner to use. Pass null to use * default partitioner. * @throws IOException When determining the region count fails. */ - public static void initTableReduceJob(String table, - Class reducer, JobConf job, Class partitioner) + public static void initTableReducerJob(String table, + Class reducer, Job job, Class partitioner) throws IOException { - job.setOutputFormat(TableOutputFormat.class); + job.setOutputFormatClass(TableOutputFormat.class); job.setReducerClass(reducer); - job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); if (partitioner == HRegionPartitioner.class) { job.setPartitionerClass(HRegionPartitioner.class); - HTable outputTable = new HTable(new HBaseConfiguration(job), table); + HTable outputTable = new HTable(new HBaseConfiguration( + job.getConfiguration()), table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) { job.setNumReduceTasks(outputTable.getRegionsInfo().size()); @@ -111,73 +147,45 @@ public class TableMapReduceUtil { * configuration does not exceed the number of regions for the given table. * * @param table The table to get the region count for. - * @param job The current job configuration to adjust. + * @param job The current job to adjust. * @throws IOException When retrieving the table details fails. */ - public static void limitNumReduceTasks(String table, JobConf job) + public static void limitNumReduceTasks(String table, Job job) throws IOException { - HTable outputTable = new HTable(new HBaseConfiguration(job), table); + HTable outputTable = new HTable(new HBaseConfiguration( + job.getConfiguration()), table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) job.setNumReduceTasks(regions); } - /** - * Ensures that the given number of map tasks for the given job - * configuration does not exceed the number of regions for the given table. - * - * @param table The table to get the region count for. - * @param job The current job configuration to adjust. - * @throws IOException When retrieving the table details fails. - */ - public static void limitNumMapTasks(String table, JobConf job) - throws IOException { - HTable outputTable = new HTable(new HBaseConfiguration(job), table); - int regions = outputTable.getRegionsInfo().size(); - if (job.getNumMapTasks() > regions) - job.setNumMapTasks(regions); - } - /** * Sets the number of reduce tasks for the given job configuration to the * number of regions the given table has. * * @param table The table to get the region count for. - * @param job The current job configuration to adjust. + * @param job The current job to adjust. * @throws IOException When retrieving the table details fails. */ - public static void setNumReduceTasks(String table, JobConf job) + public static void setNumReduceTasks(String table, Job job) throws IOException { - HTable outputTable = new HTable(new HBaseConfiguration(job), table); + HTable outputTable = new HTable(new HBaseConfiguration( + job.getConfiguration()), table); int regions = outputTable.getRegionsInfo().size(); job.setNumReduceTasks(regions); } - /** - * Sets the number of map tasks for the given job configuration to the - * number of regions the given table has. - * - * @param table The table to get the region count for. - * @param job The current job configuration to adjust. - * @throws IOException When retrieving the table details fails. - */ - public static void setNumMapTasks(String table, JobConf job) - throws IOException { - HTable outputTable = new HTable(new HBaseConfiguration(job), table); - int regions = outputTable.getRegionsInfo().size(); - job.setNumMapTasks(regions); - } - /** * Sets the number of rows to return and cache with each scanner iteration. * Higher caching values will enable faster mapreduce jobs at the expense of * requiring more heap to contain the cached rows. * - * @param job The current job configuration to adjust. + * @param job The current job to adjust. * @param batchSize The number of rows to return in batch with each scanner * iteration. */ - public static void setScannerCaching(JobConf job, int batchSize) { - job.setInt("hbase.client.scanner.caching", batchSize); + public static void setScannerCaching(Job job, int batchSize) { + job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); } + } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index a3298d554f1..b21545c65c9 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -23,68 +23,89 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** - * Convert Map/Reduce output and write it to an HBase table + * Convert Map/Reduce output and write it to an HBase table. */ public class TableOutputFormat extends FileOutputFormat { - /** JobConf parameter that specifies the output table */ - public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; private final Log LOG = LogFactory.getLog(TableOutputFormat.class); + /** Job parameter that specifies the output table. */ + public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; /** * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) * and write to an HBase table */ protected static class TableRecordWriter - implements RecordWriter { - private HTable m_table; + extends RecordWriter { + + /** The table to write to. */ + private HTable table; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. * - * @param table + * @param table The table to write to. */ public TableRecordWriter(HTable table) { - m_table = table; + this.table = table; } - public void close(Reporter reporter) - throws IOException { - m_table.flushCommits(); + /** + * Closes the writer, in this case flush table commits. + * + * @param context The context. + * @throws IOException When closing the writer fails. + * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public void close(TaskAttemptContext context) + throws IOException { + table.flushCommits(); } - public void write(ImmutableBytesWritable key, - Put value) throws IOException { - m_table.put(new Put(value)); + /** + * Writes a key/value pair into the table. + * + * @param key The key. + * @param value The value. + * @throws IOException When writing fails. + * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object) + */ + @Override + public void write(ImmutableBytesWritable key, Put value) + throws IOException { + table.put(new Put(value)); } } - @Override - @SuppressWarnings("unchecked") - public RecordWriter getRecordWriter(FileSystem ignored, - JobConf job, String name, Progressable progress) throws IOException { - + /** + * Creates a new record writer. + * + * @param context The current task context. + * @return The newly created writer instance. + * @throws IOException When creating the writer fails. + * @throws InterruptedException When the jobs is cancelled. + * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + public RecordWriter getRecordWriter( + TaskAttemptContext context) + throws IOException, InterruptedException { // expecting exactly one path - - String tableName = job.get(OUTPUT_TABLE); + String tableName = context.getConfiguration().get(OUTPUT_TABLE); HTable table = null; try { - table = new HTable(new HBaseConfiguration(job), tableName); + table = new HTable(new HBaseConfiguration(context.getConfiguration()), + tableName); } catch(IOException e) { LOG.error(e); throw e; @@ -92,14 +113,5 @@ public class TableOutputFormat extends table.setAutoFlush(false); return new TableRecordWriter(table); } - - @Override - public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws FileAlreadyExistsException, InvalidJobConfException, IOException { - - String tableName = job.get(OUTPUT_TABLE); - if(tableName == null) { - throw new IOException("Must specify table name"); - } - } + } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java index 56380b6dd15..e69de29bb2d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java @@ -1,38 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.Reducer; - -/** - * Write a table, sorting by the input key - * - * @param key class - * @param value class - */ -@SuppressWarnings("unchecked") -public interface TableReduce -extends Reducer { - -} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java index 222ebdef442..9caca066942 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java @@ -25,88 +25,152 @@ import java.io.IOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; /** - * A table split corresponds to a key range [low, high) + * A table split corresponds to a key range (low, high). All references to row + * below refer to the key of the row. */ -public class TableSplit implements InputSplit, Comparable { - private byte [] m_tableName; - private byte [] m_startRow; - private byte [] m_endRow; - private String m_regionLocation; +public class TableSplit extends InputSplit +implements Writable, Comparable { + + private byte [] tableName; + private byte [] startRow; + private byte [] endRow; + private String regionLocation; - /** default constructor */ + /** Default constructor. */ public TableSplit() { this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, ""); } /** - * Constructor - * @param tableName - * @param startRow - * @param endRow - * @param location + * Creates a new instance while assigning all variables. + * + * @param tableName The name of the current table. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. */ public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow, final String location) { - this.m_tableName = tableName; - this.m_startRow = startRow; - this.m_endRow = endRow; - this.m_regionLocation = location; + this.tableName = tableName; + this.startRow = startRow; + this.endRow = endRow; + this.regionLocation = location; } - /** @return table name */ + /** + * Returns the table name. + * + * @return The table name. + */ public byte [] getTableName() { - return this.m_tableName; + return tableName; } - /** @return starting row key */ + /** + * Returns the start row. + * + * @return The start row. + */ public byte [] getStartRow() { - return this.m_startRow; + return startRow; } - /** @return end row key */ + /** + * Returns the end row. + * + * @return The end row. + */ public byte [] getEndRow() { - return this.m_endRow; + return endRow; } - /** @return the region's hostname */ + /** + * Returns the region location. + * + * @return The region's location. + */ public String getRegionLocation() { - return this.m_regionLocation; + return regionLocation; } + /** + * Returns the region's location as an array. + * + * @return The array containing the region location. + * @see org.apache.hadoop.mapreduce.InputSplit#getLocations() + */ + @Override public String[] getLocations() { - return new String[] {this.m_regionLocation}; + return new String[] {regionLocation}; } + /** + * Returns the length of the split. + * + * @return The length of the split. + * @see org.apache.hadoop.mapreduce.InputSplit#getLength() + */ + @Override public long getLength() { // Not clear how to obtain this... seems to be used only for sorting splits return 0; } + /** + * Reads the values of each field. + * + * @param in The input to read from. + * @throws IOException When reading the input fails. + */ + @Override public void readFields(DataInput in) throws IOException { - this.m_tableName = Bytes.readByteArray(in); - this.m_startRow = Bytes.readByteArray(in); - this.m_endRow = Bytes.readByteArray(in); - this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + tableName = Bytes.readByteArray(in); + startRow = Bytes.readByteArray(in); + endRow = Bytes.readByteArray(in); + regionLocation = Bytes.toString(Bytes.readByteArray(in)); } + /** + * Writes the field values to the output. + * + * @param out The output to write to. + * @throws IOException When writing the values to the output fails. + */ + @Override public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, this.m_tableName); - Bytes.writeByteArray(out, this.m_startRow); - Bytes.writeByteArray(out, this.m_endRow); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + Bytes.writeByteArray(out, tableName); + Bytes.writeByteArray(out, startRow); + Bytes.writeByteArray(out, endRow); + Bytes.writeByteArray(out, Bytes.toBytes(regionLocation)); } + /** + * Returns the details about this instance as a string. + * + * @return The values of this instance as a string. + * @see java.lang.Object#toString() + */ @Override public String toString() { - return m_regionLocation + ":" + - Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow); + return regionLocation + ":" + + Bytes.toStringBinary(startRow) + "," + Bytes.toStringBinary(endRow); } - public int compareTo(TableSplit o) { - return Bytes.compareTo(getStartRow(), o.getStartRow()); + /** + * Compares this split against the given one. + * + * @param split The split to compare to. + * @return The result of the comparison. + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + @Override + public int compareTo(TableSplit split) { + return Bytes.compareTo(getStartRow(), split.getStartRow()); } + } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java b/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java index a6789bd13a2..2158af480e7 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java @@ -75,10 +75,10 @@ directory and the hbase conf into a job jar conf/ directory.

    HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, and data sink, {@link org.apache.hadoop.hbase.mapreduce.TableOutputFormat TableOutputFormat}, for MapReduce jobs. Writing MapReduce jobs that read or write HBase, you'll probably want to subclass -{@link org.apache.hadoop.hbase.mapreduce.TableMap TableMap} and/or -{@link org.apache.hadoop.hbase.mapreduce.TableReduce TableReduce}. See the do-nothing -pass-through classes {@link org.apache.hadoop.hbase.mapreduce.IdentityTableMap IdentityTableMap} and -{@link org.apache.hadoop.hbase.mapreduce.IdentityTableReduce IdentityTableReduce} for basic usage. For a more +{@link org.apache.hadoop.hbase.mapreduce.TableMapper TableMapper} and/or +{@link org.apache.hadoop.hbase.mapreduce.TableReducer TableReducer}. See the do-nothing +pass-through classes {@link org.apache.hadoop.hbase.mapreduce.IdentityTableMapper IdentityTableMapper} and +{@link org.apache.hadoop.hbase.mapreduce.IdentityTableReducer IdentityTableReducer} for basic usage. For a more involved example, see {@link org.apache.hadoop.hbase.mapreduce.BuildTableIndex BuildTableIndex} or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.