new package org.apache.hadoop.hbase.mapreduce

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@785055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2009-06-16 02:13:38 +00:00
parent ac9a9e01c9
commit 6b1aec5d6f
20 changed files with 2673 additions and 0 deletions

View File

@ -0,0 +1,205 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
/**
* Example table column indexing class. Runs a mapreduce job to index
* specified table columns.
* <ul><li>Each row is modeled as a Lucene document: row key is indexed in
* its untokenized form, column name-value pairs are Lucene field name-value
* pairs.</li>
* <li>A file passed on command line is used to populate an
* {@link IndexConfiguration} which is used to set various Lucene parameters,
* specify whether to optimize an index and which columns to index and/or
* store, in tokenized or untokenized form, etc. For an example, see the
* <code>createIndexConfContent</code> method in TestTableIndex
* </li>
* <li>The number of reduce tasks decides the number of indexes (partitions).
* The index(es) is stored in the output path of job configuration.</li>
* <li>The index build process is done in the reduce phase. Users can use
* the map phase to join rows from different tables or to pre-parse/analyze
* column content, etc.</li>
* </ul>
*/
public class BuildTableIndex {
private static final String USAGE = "Usage: BuildTableIndex " +
"-m <numMapTasks> -r <numReduceTasks>\n -indexConf <iconfFile> " +
"-indexDir <indexDir>\n -table <tableName> -columns <columnName1> " +
"[<columnName2> ...]";
private static void printUsage(String message) {
System.err.println(message);
System.err.println(USAGE);
System.exit(-1);
}
/** default constructor */
public BuildTableIndex() {
super();
}
/**
* @param args
* @throws IOException
*/
public void run(String[] args) throws IOException {
if (args.length < 6) {
printUsage("Too few arguments");
}
int numMapTasks = 1;
int numReduceTasks = 1;
String iconfFile = null;
String indexDir = null;
String tableName = null;
StringBuffer columnNames = null;
// parse args
for (int i = 0; i < args.length - 1; i++) {
if ("-m".equals(args[i])) {
numMapTasks = Integer.parseInt(args[++i]);
} else if ("-r".equals(args[i])) {
numReduceTasks = Integer.parseInt(args[++i]);
} else if ("-indexConf".equals(args[i])) {
iconfFile = args[++i];
} else if ("-indexDir".equals(args[i])) {
indexDir = args[++i];
} else if ("-table".equals(args[i])) {
tableName = args[++i];
} else if ("-columns".equals(args[i])) {
columnNames = new StringBuffer(args[++i]);
while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
columnNames.append(" ");
columnNames.append(args[++i]);
}
} else {
printUsage("Unsupported option " + args[i]);
}
}
if (indexDir == null || tableName == null || columnNames == null) {
printUsage("Index directory, table name and at least one column must " +
"be specified");
}
Configuration conf = new HBaseConfiguration();
if (iconfFile != null) {
// set index configuration content from a file
String content = readContent(iconfFile);
IndexConfiguration iconf = new IndexConfiguration();
// purely to validate, exception will be thrown if not valid
iconf.addFromXML(content);
conf.set("hbase.index.conf", content);
}
if (columnNames != null) {
JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
tableName, columnNames.toString());
JobClient.runJob(jobConf);
}
}
/**
* @param conf
* @param numMapTasks
* @param numReduceTasks
* @param indexDir
* @param tableName
* @param columnNames
* @return JobConf
*/
public JobConf createJob(Configuration conf, int numMapTasks,
int numReduceTasks, String indexDir, String tableName,
String columnNames) {
JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
jobConf.setJobName("build index for table " + tableName);
jobConf.setNumMapTasks(numMapTasks);
// number of indexes to partition into
jobConf.setNumReduceTasks(numReduceTasks);
// use identity map (a waste, but just as an example)
IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
jobConf);
// use IndexTableReduce to build a Lucene index
jobConf.setReducerClass(IndexTableReduce.class);
FileOutputFormat.setOutputPath(jobConf, new Path(indexDir));
jobConf.setOutputFormat(IndexOutputFormat.class);
return jobConf;
}
/*
* Read xml file of indexing configurations. The xml format is similar to
* hbase-default.xml and hadoop-default.xml. For an example configuration,
* see the <code>createIndexConfContent</code> method in TestTableIndex
* @param fileName File to read.
* @return XML configuration read from file
* @throws IOException
*/
private String readContent(String fileName) throws IOException {
File file = new File(fileName);
int length = (int) file.length();
if (length == 0) {
printUsage("Index configuration file " + fileName + " does not exist");
}
int bytesRead = 0;
byte[] bytes = new byte[length];
FileInputStream fis = new FileInputStream(file);
try {
// read entire file into content
while (bytesRead < length) {
int read = fis.read(bytes, bytesRead, length - bytesRead);
if (read > 0) {
bytesRead += read;
} else {
break;
}
}
} finally {
fis.close();
}
return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
BuildTableIndex build = new BuildTableIndex();
build.run(args);
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.util.ProgramDriver;
/**
* Driver for hbase mapreduce jobs. Select which to run by passing
* name of job to this main.
*/
public class Driver {
/**
* @param args
* @throws Throwable
*/
public static void main(String[] args) throws Throwable {
ProgramDriver pgd = new ProgramDriver();
pgd.addClass(RowCounter.NAME, RowCounter.class,
"Count rows in HBase table");
pgd.driver(args);
}
}

View File

@ -0,0 +1,158 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Extract grouping columns from input record
*/
public class GroupingTableMap
extends MapReduceBase
implements TableMap<ImmutableBytesWritable,Result> {
/**
* JobConf parameter to specify the columns used to produce the key passed to
* collect from the map phase
*/
public static final String GROUP_COLUMNS =
"hbase.mapred.groupingtablemap.columns";
protected byte [][] m_columns;
/**
* Use this before submitting a TableMap job. It will appropriately set up the
* JobConf.
*
* @param table table to be processed
* @param columns space separated list of columns to fetch
* @param groupColumns space separated list of columns used to form the key
* used in collect
* @param mapper map class
* @param job job configuration object
*/
@SuppressWarnings("unchecked")
public static void initJob(String table, String columns, String groupColumns,
Class<? extends TableMap> mapper, JobConf job) {
TableMapReduceUtil.initTableMapJob(table, columns, mapper,
ImmutableBytesWritable.class, Result.class, job);
job.set(GROUP_COLUMNS, groupColumns);
}
@Override
public void configure(JobConf job) {
super.configure(job);
String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
m_columns = new byte[cols.length][];
for(int i = 0; i < cols.length; i++) {
m_columns[i] = Bytes.toBytes(cols[i]);
}
}
/**
* Extract the grouping columns from value to construct a new key.
*
* Pass the new key and value to reduce.
* If any of the grouping columns are not found in the value, the record is skipped.
* @param key
* @param value
* @param output
* @param reporter
* @throws IOException
*/
public void map(ImmutableBytesWritable key, Result value,
OutputCollector<ImmutableBytesWritable,Result> output,
Reporter reporter) throws IOException {
byte[][] keyVals = extractKeyValues(value);
if(keyVals != null) {
ImmutableBytesWritable tKey = createGroupKey(keyVals);
output.collect(tKey, value);
}
}
/**
* Extract columns values from the current record. This method returns
* null if any of the columns are not found.
*
* Override this method if you want to deal with nulls differently.
*
* @param r
* @return array of byte values
*/
protected byte[][] extractKeyValues(Result r) {
byte[][] keyVals = null;
ArrayList<byte[]> foundList = new ArrayList<byte[]>();
int numCols = m_columns.length;
if (numCols > 0) {
for (KeyValue value: r.list()) {
byte [] column = value.getColumn();
for (int i = 0; i < numCols; i++) {
if (Bytes.equals(column, m_columns[i])) {
foundList.add(value.getValue());
break;
}
}
}
if(foundList.size() == numCols) {
keyVals = foundList.toArray(new byte[numCols][]);
}
}
return keyVals;
}
/**
* Create a key by concatenating multiple column values.
* Override this function in order to produce different types of keys.
*
* @param vals
* @return key generated by concatenating multiple column values
*/
protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
if(vals == null) {
return null;
}
StringBuilder sb = new StringBuilder();
for(int i = 0; i < vals.length; i++) {
if(i > 0) {
sb.append(" ");
}
try {
sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
}
}

View File

@ -0,0 +1,89 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
/**
* This is used to partition the output keys into groups of keys.
* Keys are grouped according to the regions that currently exist
* so that each reducer fills a single region so load is distributed.
*
* @param <K2>
* @param <V2>
*/
public class HRegionPartitioner<K2,V2>
implements Partitioner<ImmutableBytesWritable, V2> {
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
private HTable table;
private byte[][] startKeys;
public void configure(JobConf job) {
try {
this.table = new HTable(new HBaseConfiguration(job),
job.get(TableOutputFormat.OUTPUT_TABLE));
} catch (IOException e) {
LOG.error(e);
}
try {
this.startKeys = this.table.getStartKeys();
} catch (IOException e) {
LOG.error(e);
}
}
public int getPartition(ImmutableBytesWritable key,
V2 value, int numPartitions) {
byte[] region = null;
// Only one region return 0
if (this.startKeys.length == 1){
return 0;
}
try {
// Not sure if this is cached after a split so we could have problems
// here if a region splits while mapping
region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey();
} catch (IOException e) {
LOG.error(e);
}
for (int i = 0; i < this.startKeys.length; i++){
if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
if (i >= numPartitions-1){
// cover if we have less reduces then regions.
return (Integer.toString(i).hashCode()
& Integer.MAX_VALUE) % numPartitions;
}
return i;
}
}
// if above fails to find start key that match we need to return something
return 0;
}
}

View File

@ -0,0 +1,74 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Pass the given key and record as-is to reduce
*/
public class IdentityTableMap
extends MapReduceBase
implements TableMap<ImmutableBytesWritable, Result> {
/** constructor */
public IdentityTableMap() {
super();
}
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table table name
* @param columns columns to scan
* @param mapper mapper class
* @param job job configuration
*/
@SuppressWarnings("unchecked")
public static void initJob(String table, String columns,
Class<? extends TableMap> mapper, JobConf job) {
TableMapReduceUtil.initTableMapJob(table, columns, mapper,
ImmutableBytesWritable.class, Result.class, job);
}
/**
* Pass the key, value to reduce
* @param key
* @param value
* @param output
* @param reporter
* @throws IOException
*/
public void map(ImmutableBytesWritable key, Result value,
OutputCollector<ImmutableBytesWritable,Result> output,
Reporter reporter) throws IOException {
// convert
output.collect(key, value);
}
}

View File

@ -0,0 +1,59 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Write to table each key, record pair
*/
public class IdentityTableReduce
extends MapReduceBase
implements TableReduce<ImmutableBytesWritable, Put> {
@SuppressWarnings("unused")
private static final Log LOG =
LogFactory.getLog(IdentityTableReduce.class.getName());
/**
* No aggregation, output pairs of (key, record)
* @param key
* @param values
* @param output
* @param reporter
* @throws IOException
*/
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
throws IOException {
while(values.hasNext()) {
output.collect(key, values.next());
}
}
}

View File

@ -0,0 +1,422 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
/**
* Configuration parameters for building a Lucene index
*/
public class IndexConfiguration extends Configuration {
private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
static final String HBASE_COLUMN_NAME = "hbase.column.name";
static final String HBASE_COLUMN_STORE = "hbase.column.store";
static final String HBASE_COLUMN_INDEX = "hbase.column.index";
static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
"hbase.index.max.buffered.docs";
static final String HBASE_INDEX_MAX_BUFFERED_DELS =
"hbase.index.max.buffered.dels";
static final String HBASE_INDEX_MAX_FIELD_LENGTH =
"hbase.index.max.field.length";
static final String HBASE_INDEX_MAX_MERGE_DOCS =
"hbase.index.max.merge.docs";
static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
// double ramBufferSizeMB;
static final String HBASE_INDEX_SIMILARITY_NAME =
"hbase.index.similarity.name";
static final String HBASE_INDEX_USE_COMPOUND_FILE =
"hbase.index.use.compound.file";
static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
public static class ColumnConf extends Properties {
private static final long serialVersionUID = 7419012290580607821L;
boolean getBoolean(String name, boolean defaultValue) {
String valueString = getProperty(name);
if ("true".equals(valueString))
return true;
else if ("false".equals(valueString))
return false;
else
return defaultValue;
}
void setBoolean(String name, boolean value) {
setProperty(name, Boolean.toString(value));
}
float getFloat(String name, float defaultValue) {
String valueString = getProperty(name);
if (valueString == null)
return defaultValue;
try {
return Float.parseFloat(valueString);
} catch (NumberFormatException e) {
return defaultValue;
}
}
void setFloat(String name, float value) {
setProperty(name, Float.toString(value));
}
}
private Map<String, ColumnConf> columnMap =
new ConcurrentHashMap<String, ColumnConf>();
public Iterator<String> columnNameIterator() {
return columnMap.keySet().iterator();
}
public boolean isIndex(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
}
public void setIndex(String columnName, boolean index) {
getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
}
public boolean isStore(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
}
public void setStore(String columnName, boolean store) {
getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
}
public boolean isTokenize(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
}
public void setTokenize(String columnName, boolean tokenize) {
getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
}
public float getBoost(String columnName) {
return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
}
public void setBoost(String columnName, float boost) {
getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
}
public boolean isOmitNorms(String columnName) {
return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
}
public void setOmitNorms(String columnName, boolean omitNorms) {
getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
}
private ColumnConf getColumn(String columnName) {
ColumnConf column = columnMap.get(columnName);
if (column == null) {
column = new ColumnConf();
columnMap.put(columnName, column);
}
return column;
}
public String getAnalyzerName() {
return get(HBASE_INDEX_ANALYZER_NAME,
"org.apache.lucene.analysis.standard.StandardAnalyzer");
}
public void setAnalyzerName(String analyzerName) {
set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
}
public int getMaxBufferedDeleteTerms() {
return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
}
public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
}
public int getMaxBufferedDocs() {
return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
}
public void setMaxBufferedDocs(int maxBufferedDocs) {
setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
}
public int getMaxFieldLength() {
return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
}
public void setMaxFieldLength(int maxFieldLength) {
setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
}
public int getMaxMergeDocs() {
return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
}
public void setMaxMergeDocs(int maxMergeDocs) {
setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
}
public int getMergeFactor() {
return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
}
public void setMergeFactor(int mergeFactor) {
setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
}
public String getRowkeyName() {
return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
}
public void setRowkeyName(String rowkeyName) {
set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
}
public String getSimilarityName() {
return get(HBASE_INDEX_SIMILARITY_NAME, null);
}
public void setSimilarityName(String similarityName) {
set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
}
public boolean isUseCompoundFile() {
return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
}
public void setUseCompoundFile(boolean useCompoundFile) {
setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
}
public boolean doOptimize() {
return getBoolean(HBASE_INDEX_OPTIMIZE, true);
}
public void setDoOptimize(boolean doOptimize) {
setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
}
public void addFromXML(String content) {
try {
DocumentBuilder builder = DocumentBuilderFactory.newInstance()
.newDocumentBuilder();
Document doc = builder
.parse(new ByteArrayInputStream(content.getBytes()));
Element root = doc.getDocumentElement();
if (!"configuration".equals(root.getTagName())) {
LOG.fatal("bad conf file: top-level element not <configuration>");
}
NodeList props = root.getChildNodes();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element)) {
continue;
}
Element prop = (Element) propNode;
if ("property".equals(prop.getTagName())) {
propertyFromXML(prop, null);
} else if ("column".equals(prop.getTagName())) {
columnConfFromXML(prop);
} else {
LOG.warn("bad conf content: element neither <property> nor <column>");
}
}
} catch (Exception e) {
LOG.fatal("error parsing conf content: " + e);
throw new RuntimeException(e);
}
}
private void propertyFromXML(Element prop, Properties properties) {
NodeList fields = prop.getChildNodes();
String attr = null;
String value = null;
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element)) {
continue;
}
Element field = (Element) fieldNode;
if ("name".equals(field.getTagName())) {
attr = ((Text) field.getFirstChild()).getData();
}
if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
value = ((Text) field.getFirstChild()).getData();
}
}
if (attr != null && value != null) {
if (properties == null) {
set(attr, value);
} else {
properties.setProperty(attr, value);
}
}
}
private void columnConfFromXML(Element column) {
ColumnConf columnConf = new ColumnConf();
NodeList props = column.getChildNodes();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element)) {
continue;
}
Element prop = (Element) propNode;
if ("property".equals(prop.getTagName())) {
propertyFromXML(prop, columnConf);
} else {
LOG.warn("bad conf content: element not <property>");
}
}
if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
} else {
LOG.warn("bad column conf: name not specified");
}
}
public void write(OutputStream out) {
try {
Document doc = writeDocument();
DOMSource source = new DOMSource(doc);
StreamResult result = new StreamResult(out);
TransformerFactory transFactory = TransformerFactory.newInstance();
Transformer transformer = transFactory.newTransformer();
transformer.transform(source, result);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Document writeDocument() {
Iterator<Map.Entry<String, String>> iter = iterator();
try {
Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
.newDocument();
Element conf = doc.createElement("configuration");
doc.appendChild(conf);
conf.appendChild(doc.createTextNode("\n"));
Map.Entry<String, String> entry;
while (iter.hasNext()) {
entry = iter.next();
String name = entry.getKey();
String value = entry.getValue();
writeProperty(doc, conf, name, value);
}
Iterator<String> columnIter = columnNameIterator();
while (columnIter.hasNext()) {
writeColumn(doc, conf, columnIter.next());
}
return doc;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void writeProperty(Document doc, Element parent, String name,
String value) {
Element propNode = doc.createElement("property");
parent.appendChild(propNode);
Element nameNode = doc.createElement("name");
nameNode.appendChild(doc.createTextNode(name));
propNode.appendChild(nameNode);
Element valueNode = doc.createElement("value");
valueNode.appendChild(doc.createTextNode(value));
propNode.appendChild(valueNode);
parent.appendChild(doc.createTextNode("\n"));
}
private void writeColumn(Document doc, Element parent, String columnName) {
Element column = doc.createElement("column");
parent.appendChild(column);
column.appendChild(doc.createTextNode("\n"));
ColumnConf columnConf = getColumn(columnName);
for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
if (entry.getKey() instanceof String
&& entry.getValue() instanceof String) {
writeProperty(doc, column, (String) entry.getKey(), (String) entry
.getValue());
}
}
}
@Override
public String toString() {
StringWriter writer = new StringWriter();
try {
Document doc = writeDocument();
DOMSource source = new DOMSource(doc);
StreamResult result = new StreamResult(writer);
TransformerFactory transFactory = TransformerFactory.newInstance();
Transformer transformer = transFactory.newTransformer();
transformer.transform(source, result);
} catch (Exception e) {
throw new RuntimeException(e);
}
return writer.toString();
}
}

View File

@ -0,0 +1,163 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Similarity;
/**
* Create a local index, unwrap Lucene documents created by reduce, add them to
* the index, and copy the index to the destination.
*/
public class IndexOutputFormat extends
FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
private Random random = new Random();
@Override
public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
getRecordWriter(final FileSystem fs, JobConf job, String name,
final Progressable progress)
throws IOException {
final Path perm = new Path(FileOutputFormat.getOutputPath(job), name);
final Path temp = job.getLocalPath("index/_"
+ Integer.toString(random.nextInt()));
LOG.info("To index into " + perm);
// delete old, if any
fs.delete(perm, true);
final IndexConfiguration indexConf = new IndexConfiguration();
String content = job.get("hbase.index.conf");
if (content != null) {
indexConf.addFromXML(content);
}
String analyzerName = indexConf.getAnalyzerName();
Analyzer analyzer;
try {
Class<?> analyzerClass = Class.forName(analyzerName);
analyzer = (Analyzer) analyzerClass.newInstance();
} catch (Exception e) {
throw new IOException("Error in creating an analyzer object "
+ analyzerName);
}
// build locally first
final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp)
.toString(), analyzer, true);
// no delete, so no need for maxBufferedDeleteTerms
writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
writer.setMaxFieldLength(indexConf.getMaxFieldLength());
writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
writer.setMergeFactor(indexConf.getMergeFactor());
String similarityName = indexConf.getSimilarityName();
if (similarityName != null) {
try {
Class<?> similarityClass = Class.forName(similarityName);
Similarity similarity = (Similarity) similarityClass.newInstance();
writer.setSimilarity(similarity);
} catch (Exception e) {
throw new IOException("Error in creating a similarty object "
+ similarityName);
}
}
writer.setUseCompoundFile(indexConf.isUseCompoundFile());
return new RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>() {
boolean closed;
private long docCount = 0;
public void write(ImmutableBytesWritable key,
LuceneDocumentWrapper value)
throws IOException {
// unwrap and index doc
Document doc = value.get();
writer.addDocument(doc);
docCount++;
progress.progress();
}
public void close(final Reporter reporter) throws IOException {
// spawn a thread to give progress heartbeats
Thread prog = new Thread() {
@Override
public void run() {
while (!closed) {
try {
reporter.setStatus("closing");
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
} catch (Throwable e) {
return;
}
}
}
};
try {
prog.start();
// optimize index
if (indexConf.doOptimize()) {
if (LOG.isInfoEnabled()) {
LOG.info("Optimizing index.");
}
writer.optimize();
}
// close index
writer.close();
if (LOG.isInfoEnabled()) {
LOG.info("Done indexing " + docCount + " docs.");
}
// copy to perm destination in dfs
fs.completeLocalOutput(perm, temp);
if (LOG.isInfoEnabled()) {
LOG.info("Copy done.");
}
} finally {
closed = true;
}
}
};
}
}

View File

@ -0,0 +1,109 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Construct a Lucene document per row, which is consumed by IndexOutputFormat
* to build a Lucene index
*/
public class IndexTableReduce extends MapReduceBase implements
Reducer<ImmutableBytesWritable, Result, ImmutableBytesWritable, LuceneDocumentWrapper> {
private static final Log LOG = LogFactory.getLog(IndexTableReduce.class);
private IndexConfiguration indexConf;
@Override
public void configure(JobConf job) {
super.configure(job);
indexConf = new IndexConfiguration();
String content = job.get("hbase.index.conf");
if (content != null) {
indexConf.addFromXML(content);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Index conf: " + indexConf);
}
}
@Override
public void close() throws IOException {
super.close();
}
public void reduce(ImmutableBytesWritable key, Iterator<Result> values,
OutputCollector<ImmutableBytesWritable, LuceneDocumentWrapper> output,
Reporter reporter)
throws IOException {
if (!values.hasNext()) {
return;
}
Document doc = new Document();
// index and store row key, row key already UTF-8 encoded
Field keyField = new Field(indexConf.getRowkeyName(),
Bytes.toString(key.get(), key.getOffset(), key.getLength()),
Field.Store.YES, Field.Index.UN_TOKENIZED);
keyField.setOmitNorms(true);
doc.add(keyField);
while (values.hasNext()) {
Result value = values.next();
// each column (name-value pair) is a field (name-value pair)
for (KeyValue kv: value.list()) {
// name is already UTF-8 encoded
String column = Bytes.toString(kv.getColumn());
byte[] columnValue = kv.getValue();
Field.Store store = indexConf.isStore(column)?
Field.Store.YES: Field.Store.NO;
Field.Index index = indexConf.isIndex(column)?
(indexConf.isTokenize(column)?
Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
Field.Index.NO;
// UTF-8 encode value
Field field = new Field(column, Bytes.toString(columnValue),
store, index);
field.setBoost(indexConf.getBoost(column));
field.setOmitNorms(indexConf.isOmitNorms(column));
doc.add(field);
}
}
output.collect(key, new LuceneDocumentWrapper(doc));
}
}

View File

@ -0,0 +1,55 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import org.apache.hadoop.io.Writable;
import org.apache.lucene.document.Document;
/**
* A utility class used to pass a lucene document from reduce to OutputFormat.
* It doesn't really serialize/deserialize a lucene document.
*/
public class LuceneDocumentWrapper implements Writable {
protected Document doc;
/**
* @param doc
*/
public LuceneDocumentWrapper(Document doc) {
this.doc = doc;
}
/**
* @return the document
*/
public Document get() {
return doc;
}
public void readFields(DataInput in) {
// intentionally left blank
}
public void write(DataOutput out) {
// intentionally left blank
}
}

View File

@ -0,0 +1,134 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A job with a map to count rows.
* Map outputs table rows IF the input row has columns that have content.
* Uses an {@link IdentityReducer}
*/
public class RowCounter extends Configured implements Tool {
// Name of this 'program'
static final String NAME = "rowcounter";
/**
* Mapper that runs the count.
*/
static class RowCounterMapper
implements TableMap<ImmutableBytesWritable, Result> {
private static enum Counters {ROWS}
public void map(ImmutableBytesWritable row, Result values,
OutputCollector<ImmutableBytesWritable, Result> output,
Reporter reporter)
throws IOException {
boolean content = !values.list().isEmpty();
for (KeyValue value: values.list()) {
if (value.getValue().length > 0) {
content = true;
break;
}
}
if (!content) {
// Don't count rows that are all empty values.
return;
}
// Give out same value every time. We're only interested in the row/key
reporter.incrCounter(Counters.ROWS, 1);
}
public void configure(JobConf jc) {
// Nothing to do.
}
public void close() throws IOException {
// Nothing to do.
}
}
/**
* @param args
* @return the JobConf
* @throws IOException
*/
public JobConf createSubmittableJob(String[] args) throws IOException {
JobConf c = new JobConf(getConf(), getClass());
c.setJobName(NAME);
// Columns are space delimited
StringBuilder sb = new StringBuilder();
final int columnoffset = 2;
for (int i = columnoffset; i < args.length; i++) {
if (i > columnoffset) {
sb.append(" ");
}
sb.append(args[i]);
}
// Second argument is the table name.
TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
c.setNumReduceTasks(0);
// First arg is the output directory.
FileOutputFormat.setOutputPath(c, new Path(args[0]));
return c;
}
static int printUsage() {
System.out.println(NAME +
" <outputdir> <tablename> <column1> [<column2>...]");
return -1;
}
public int run(final String[] args) throws Exception {
// Make sure there are at least 3 parameters
if (args.length < 3) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
return printUsage();
}
JobClient.runJob(createSubmittableJob(args));
return 0;
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
HBaseConfiguration c = new HBaseConfiguration();
int errCode = ToolRunner.run(c, new RowCounter(), args);
System.exit(errCode);
}
}

View File

@ -0,0 +1,6 @@
# ResourceBundle properties file for RowCounter MR job
CounterGroupName= RowCounter
ROWS.name= Rows

View File

@ -0,0 +1,82 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.util.StringUtils;
/**
* Convert HBase tabular data into a format that is consumable by Map/Reduce.
*/
public class TableInputFormat extends TableInputFormatBase implements
JobConfigurable {
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
/**
* space delimited list of columns
*/
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
public void configure(JobConf job) {
Path[] tableNames = FileInputFormat.getInputPaths(job);
String colArg = job.get(COLUMN_LIST);
String[] colNames = colArg.split(" ");
byte [][] m_cols = new byte[colNames.length][];
for (int i = 0; i < m_cols.length; i++) {
m_cols[i] = Bytes.toBytes(colNames[i]);
}
setInputColumns(m_cols);
try {
setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
}
public void validateInput(JobConf job) throws IOException {
// expecting exactly one path
Path [] tableNames = FileInputFormat.getInputPaths(job);
if (tableNames == null || tableNames.length > 1) {
throw new IOException("expecting one table name");
}
// connected to table?
if (getHTable() == null) {
throw new IOException("could not connect to table '" +
tableNames[0].getName() + "'");
}
// expecting at least one column
String colArg = job.get(COLUMN_LIST);
if (colArg == null || colArg.length() == 0) {
throw new IOException("expecting at least one column");
}
}
}

View File

@ -0,0 +1,347 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.StopRowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
/**
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
* byte[] of input columns and optionally a {@link RowFilterInterface}.
* Subclasses may use other TableRecordReader implementations.
* <p>
* An example of a subclass:
* <pre>
* class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
*
* public void configure(JobConf job) {
* HTable exampleTable = new HTable(new HBaseConfiguration(job),
* Bytes.toBytes("exampleTable"));
* // mandatory
* setHTable(exampleTable);
* Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
* Bytes.toBytes("columnB") };
* // mandatory
* setInputColumns(inputColumns);
* RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
* // optional
* setRowFilter(exampleFilter);
* }
*
* public void validateInput(JobConf job) throws IOException {
* }
* }
* </pre>
*/
public abstract class TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, Result> {
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
private byte [][] inputColumns;
private HTable table;
private TableRecordReader tableRecordReader;
private RowFilterInterface rowFilter;
/**
* Iterate over an HBase table data, return (Text, RowResult) pairs
*/
protected class TableRecordReader
implements RecordReader<ImmutableBytesWritable, Result> {
private byte [] startRow;
private byte [] endRow;
private byte [] lastRow;
private RowFilterInterface trrRowFilter;
private ResultScanner scanner;
private HTable htable;
private byte [][] trrInputColumns;
/**
* Restart from survivable exceptions by creating a new scanner.
*
* @param firstRow
* @throws IOException
*/
public void restart(byte[] firstRow) throws IOException {
if ((endRow != null) && (endRow.length > 0)) {
if (trrRowFilter != null) {
final Set<RowFilterInterface> rowFiltersSet =
new HashSet<RowFilterInterface>();
rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow)));
rowFiltersSet.add(trrRowFilter);
Scan scan = new Scan(startRow);
scan.addColumns(trrInputColumns);
// scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
// rowFiltersSet));
this.scanner = this.htable.getScanner(scan);
} else {
Scan scan = new Scan(firstRow, endRow);
scan.addColumns(trrInputColumns);
this.scanner = this.htable.getScanner(scan);
}
} else {
Scan scan = new Scan(firstRow);
scan.addColumns(trrInputColumns);
// scan.setFilter(trrRowFilter);
this.scanner = this.htable.getScanner(scan);
}
}
/**
* Build the scanner. Not done in constructor to allow for extension.
*
* @throws IOException
*/
public void init() throws IOException {
restart(startRow);
}
/**
* @param htable the {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
this.htable = htable;
}
/**
* @param inputColumns the columns to be placed in {@link RowResult}.
*/
public void setInputColumns(final byte [][] inputColumns) {
this.trrInputColumns = inputColumns;
}
/**
* @param startRow the first row in the split
*/
public void setStartRow(final byte [] startRow) {
this.startRow = startRow;
}
/**
*
* @param endRow the last row in the split
*/
public void setEndRow(final byte [] endRow) {
this.endRow = endRow;
}
/**
* @param rowFilter the {@link RowFilterInterface} to be used.
*/
public void setRowFilter(RowFilterInterface rowFilter) {
this.trrRowFilter = rowFilter;
}
public void close() {
this.scanner.close();
}
/**
* @return ImmutableBytesWritable
*
* @see org.apache.hadoop.mapred.RecordReader#createKey()
*/
public ImmutableBytesWritable createKey() {
return new ImmutableBytesWritable();
}
/**
* @return RowResult
*
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
public Result createValue() {
return new Result();
}
public long getPos() {
// This should be the ordinal tuple in the range;
// not clear how to calculate...
return 0;
}
public float getProgress() {
// Depends on the total number of tuples and getPos
return 0;
}
/**
* @param key HStoreKey as input key.
* @param value MapWritable as input value
* @return true if there was more data
* @throws IOException
*/
public boolean next(ImmutableBytesWritable key, Result value)
throws IOException {
Result result;
try {
result = this.scanner.next();
} catch (UnknownScannerException e) {
LOG.debug("recovered from " + StringUtils.stringifyException(e));
restart(lastRow);
this.scanner.next(); // skip presumed already mapped row
result = this.scanner.next();
}
if (result != null && result.size() > 0) {
key.set(result.getRow());
lastRow = key.get();
Writables.copyWritable(result, value);
return true;
}
return false;
}
}
/**
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
* the default.
*
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
* JobConf, Reporter)
*/
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
InputSplit split, JobConf job, Reporter reporter)
throws IOException {
TableSplit tSplit = (TableSplit) split;
TableRecordReader trr = this.tableRecordReader;
// if no table record reader was provided use default
if (trr == null) {
trr = new TableRecordReader();
}
trr.setStartRow(tSplit.getStartRow());
trr.setEndRow(tSplit.getEndRow());
trr.setHTable(this.table);
trr.setInputColumns(this.inputColumns);
trr.setRowFilter(this.rowFilter);
trr.init();
return trr;
}
/**
* Calculates the splits that will serve as input for the map tasks.
* <ul>
* Splits are created in number equal to the smallest between numSplits and
* the number of {@link HRegion}s in the table. If the number of splits is
* smaller than the number of {@link HRegion}s then splits are spanned across
* multiple {@link HRegion}s and are grouped the most evenly possible. In the
* case splits are uneven the bigger splits are placed first in the
* {@link InputSplit} array.
*
* @param job the map task {@link JobConf}
* @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
*
* @return the input splits
*
* @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
*/
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
byte [][] startKeys = this.table.getStartKeys();
if (startKeys == null || startKeys.length == 0) {
throw new IOException("Expecting at least one region");
}
if (this.table == null) {
throw new IOException("No table was provided");
}
if (this.inputColumns == null || this.inputColumns.length == 0) {
throw new IOException("Expecting at least one column");
}
int realNumSplits = numSplits > startKeys.length? startKeys.length:
numSplits;
InputSplit[] splits = new InputSplit[realNumSplits];
int middle = startKeys.length / realNumSplits;
int startPos = 0;
for (int i = 0; i < realNumSplits; i++) {
int lastPos = startPos + middle;
lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
String regionLocation = table.getRegionLocation(startKeys[startPos]).
getServerAddress().getHostname();
splits[i] = new TableSplit(this.table.getTableName(),
startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
HConstants.EMPTY_START_ROW, regionLocation);
LOG.info("split: " + i + "->" + splits[i]);
startPos = lastPos;
}
return splits;
}
/**
* @param inputColumns to be passed in {@link RowResult} to the map task.
*/
protected void setInputColumns(byte [][] inputColumns) {
this.inputColumns = inputColumns;
}
/**
* Allows subclasses to get the {@link HTable}.
*/
protected HTable getHTable() {
return this.table;
}
/**
* Allows subclasses to set the {@link HTable}.
*
* @param table to get the data from
*/
protected void setHTable(HTable table) {
this.table = table;
}
/**
* Allows subclasses to set the {@link TableRecordReader}.
*
* @param tableRecordReader
* to provide other {@link TableRecordReader} implementations.
*/
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
}
/**
* Allows subclasses to set the {@link RowFilterInterface} to be used.
*
* @param rowFilter
*/
protected void setRowFilter(RowFilterInterface rowFilter) {
this.rowFilter = rowFilter;
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Mapper;
/**
* Scan an HBase table to sort by a specified sort column.
* If the column does not exist, the record is not passed to Reduce.
*
* @param <K> WritableComparable key class
* @param <V> Writable value class
*/
public interface TableMap<K extends WritableComparable<K>, V extends Writable>
extends Mapper<ImmutableBytesWritable, Result, K, V> {
}

View File

@ -0,0 +1,171 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
/**
* Utility for {@link TableMap} and {@link TableReduce}
*/
@SuppressWarnings("unchecked")
public class TableMapReduceUtil {
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table The table name to read from.
* @param columns The columns to scan.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job configuration to adjust.
*/
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, JobConf job) {
job.setInputFormat(TableInputFormat.class);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job)
throws IOException {
initTableReduceJob(table, reducer, job, null);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
throws IOException {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(outputTable.getRegionsInfo().size());
}
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
}
/**
* Ensures that the given number of reduce tasks for the given job
* configuration does not exceed the number of regions for the given table.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void limitNumReduceTasks(String table, JobConf job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumReduceTasks() > regions)
job.setNumReduceTasks(regions);
}
/**
* Ensures that the given number of map tasks for the given job
* configuration does not exceed the number of regions for the given table.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void limitNumMapTasks(String table, JobConf job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumMapTasks() > regions)
job.setNumMapTasks(regions);
}
/**
* Sets the number of reduce tasks for the given job configuration to the
* number of regions the given table has.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void setNumReduceTasks(String table, JobConf job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
job.setNumReduceTasks(regions);
}
/**
* Sets the number of map tasks for the given job configuration to the
* number of regions the given table has.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void setNumMapTasks(String table, JobConf job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
job.setNumMapTasks(regions);
}
}

View File

@ -0,0 +1,105 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
/**
* Convert Map/Reduce output and write it to an HBase table
*/
public class TableOutputFormat extends
FileOutputFormat<ImmutableBytesWritable, Put> {
/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
/**
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
* and write to an HBase table
*/
protected static class TableRecordWriter
implements RecordWriter<ImmutableBytesWritable, Put> {
private HTable m_table;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
*
* @param table
*/
public TableRecordWriter(HTable table) {
m_table = table;
}
public void close(Reporter reporter)
throws IOException {
m_table.flushCommits();
}
public void write(ImmutableBytesWritable key,
Put value) throws IOException {
m_table.put(new Put(value));
}
}
@Override
@SuppressWarnings("unchecked")
public RecordWriter getRecordWriter(FileSystem ignored,
JobConf job, String name, Progressable progress) throws IOException {
// expecting exactly one path
String tableName = job.get(OUTPUT_TABLE);
HTable table = null;
try {
table = new HTable(new HBaseConfiguration(job), tableName);
} catch(IOException e) {
LOG.error(e);
throw e;
}
table.setAutoFlush(false);
return new TableRecordWriter(table);
}
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException, InvalidJobConfException, IOException {
String tableName = job.get(OUTPUT_TABLE);
if(tableName == null) {
throw new IOException("Must specify table name");
}
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Reducer;
/**
* Write a table, sorting by the input key
*
* @param <K> key class
* @param <V> value class
*/
@SuppressWarnings("unchecked")
public interface TableReduce<K extends WritableComparable, V extends Writable>
extends Reducer<K, V, ImmutableBytesWritable, Put> {
}

View File

@ -0,0 +1,112 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.InputSplit;
/**
* A table split corresponds to a key range [low, high)
*/
public class TableSplit implements InputSplit, Comparable<TableSplit> {
private byte [] m_tableName;
private byte [] m_startRow;
private byte [] m_endRow;
private String m_regionLocation;
/** default constructor */
public TableSplit() {
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY, "");
}
/**
* Constructor
* @param tableName
* @param startRow
* @param endRow
* @param location
*/
public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
final String location) {
this.m_tableName = tableName;
this.m_startRow = startRow;
this.m_endRow = endRow;
this.m_regionLocation = location;
}
/** @return table name */
public byte [] getTableName() {
return this.m_tableName;
}
/** @return starting row key */
public byte [] getStartRow() {
return this.m_startRow;
}
/** @return end row key */
public byte [] getEndRow() {
return this.m_endRow;
}
/** @return the region's hostname */
public String getRegionLocation() {
return this.m_regionLocation;
}
public String[] getLocations() {
return new String[] {this.m_regionLocation};
}
public long getLength() {
// Not clear how to obtain this... seems to be used only for sorting splits
return 0;
}
public void readFields(DataInput in) throws IOException {
this.m_tableName = Bytes.readByteArray(in);
this.m_startRow = Bytes.readByteArray(in);
this.m_endRow = Bytes.readByteArray(in);
this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
}
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.m_tableName);
Bytes.writeByteArray(out, this.m_startRow);
Bytes.writeByteArray(out, this.m_endRow);
Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
}
@Override
public String toString() {
return m_regionLocation + ":" +
Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
}
public int compareTo(TableSplit o) {
return Bytes.compareTo(getStartRow(), o.getStartRow());
}
}

View File

@ -0,0 +1,267 @@
/*
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
Input/OutputFormats, a table indexing MapReduce job, and utility
<h2>Table of Contents</h2>
<ul>
<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
<li><a href="#examples">Example Code</a></li>
</ul>
<h2><a name="classpath">HBase, MapReduce and the CLASSPATH</a></h2>
<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
You could add <code>hbase-site.xml</code> to $HADOOP_HOME/conf and add
<code>hbase-X.X.X.jar</code> to the <code>$HADOOP_HOME/lib</code> and copy these
changes across your cluster but the cleanest means of adding hbase configuration
and classes to the cluster <code>CLASSPATH</code> is by uncommenting
<code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
and adding the path to the hbase jar and <code>$HBASE_CONF_DIR</code> directory.
Then copy the amended configuration around the cluster.
You'll probably need to restart the MapReduce cluster if you want it to notice
the new configuration.
</p>
<p>For example, here is how you would amend <code>hadoop-env.sh</code> adding the
built hbase jar, hbase conf, and the <code>PerformanceEvaluation</code> class from
the built hbase test jar to the hadoop <code>CLASSPATH<code>:
<blockquote><pre># Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=
export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf</pre></blockquote>
<p>Expand <code>$HBASE_HOME</code> in the above appropriately to suit your
local environment.</p>
<p>After copying the above change around your cluster, this is how you would run
the PerformanceEvaluation MR job to put up 4 clients (Presumes a ready mapreduce
cluster):
<blockquote><pre>$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4</pre></blockquote>
The PerformanceEvaluation class wil be found on the CLASSPATH because you
added <code>$HBASE_HOME/build/test</code> to HADOOP_CLASSPATH
</p>
<p>Another possibility, if for example you do not have access to hadoop-env.sh or
are unable to restart the hadoop cluster, is bundling the hbase jar into a mapreduce
job jar adding it and its dependencies under the job jar <code>lib/</code>
directory and the hbase conf into a job jar <code>conf/</code> directory.
</a>
<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat},
and data sink, {@link org.apache.hadoop.hbase.mapreduce.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
Writing MapReduce jobs that read or write HBase, you'll probably want to subclass
{@link org.apache.hadoop.hbase.mapreduce.TableMap TableMap} and/or
{@link org.apache.hadoop.hbase.mapreduce.TableReduce TableReduce}. See the do-nothing
pass-through classes {@link org.apache.hadoop.hbase.mapreduce.IdentityTableMap IdentityTableMap} and
{@link org.apache.hadoop.hbase.mapreduce.IdentityTableReduce IdentityTableReduce} for basic usage. For a more
involved example, see {@link org.apache.hadoop.hbase.mapreduce.BuildTableIndex BuildTableIndex}
or review the <code>org.apache.hadoop.hbase.mapreduce.TestTableMapReduce</code> unit test.
</p>
<p>Running mapreduce jobs that have hbase as source or sink, you'll need to
specify source/sink table and column names in your configuration.</p>
<p>Reading from hbase, the TableInputFormat asks hbase for the list of
regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
whichever is smaller (If your job only has two maps, up mapred.map.tasks
to a number > number of regions). Maps will run on the adjacent TaskTracker
if you are running a TaskTracer and RegionServer per node.
Writing, it may make sense to avoid the reduce step and write yourself back into
hbase from inside your map. You'd do this when your job does not need the sort
and collation that mapreduce does on the map emitted data; on insert,
hbase 'sorts' so there is no point double-sorting (and shuffling data around
your mapreduce cluster) unless you need to. If you do not need the reduce,
you might just have your map emit counts of records processed just so the
framework's report at the end of your job has meaning or set the number of
reduces to zero and use TableOutputFormat. See example code
below. If running the reduce step makes sense in your case, its usually better
to have lots of reducers so load is spread across the hbase cluster.</p>
<p>There is also a new hbase partitioner that will run as many reducers as
currently existing regions. The
{@link org.apache.hadoop.hbase.mapreduce.HRegionPartitioner} is suitable
when your table is large and your upload is not such that it will greatly
alter the number of existing regions when done; other use the default
partitioner.
</p>
<h2><a name="examples">Example Code</a></h2>
<h3>Sample Row Counter</h3>
<p>See {@link org.apache.hadoop.hbase.mapreduce.RowCounter}. You should be able to run
it by doing: <code>% ./bin/hadoop jar hbase-X.X.X.jar</code>. This will invoke
the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs
offered. You may need to add the hbase conf directory to <code>$HADOOP_HOME/conf/hadoop-env.sh#HADOOP_CLASSPATH</code>
so the rowcounter gets pointed at the right hbase cluster (or, build a new jar
with an appropriate hbase-site.xml built into your job jar).
</p>
<h3>PerformanceEvaluation</h3>
<p>See org.apache.hadoop.hbase.PerformanceEvaluation from hbase src/test. It runs
a mapreduce job to run concurrent clients reading and writing hbase.
</p>
<h3>Sample MR Bulk Uploader</h3>
<p>A students/classes example based on a contribution by Naama Kraus with logs of
documentation can be found over in src/examples/mapred.
Its the <code>org.apache.hadoop.hbase.mapreduce.SampleUploader</code> class.
Just copy it under src/java/org/apache/hadoop/hbase/mapred to compile and try it
(until we start generating an hbase examples jar). The class reads a data file
from HDFS and per line, does an upload to HBase using TableReduce.
Read the class comment for specification of inputs, prerequisites, etc.
</p>
<h3>Example to bulk import/load a text file into an HTable
</h3>
<p>Here's a sample program from
<a href="http://www.spicylogic.com/allenday/blog/category/computing/distributed-systems/hadoop/hbase/">Allen Day</a>
that takes an HDFS text file path and an HBase table name as inputs, and loads the contents of the text file to the table
all up in the map phase.
</p>
<blockquote><pre>
package com.spicylogic.hbase;
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Class that adds the parsed line from the input to hbase
* in the map function. Map has no emissions and job
* has no reduce.
*&#x2f;
public class BulkImport implements Tool {
private static final String NAME = "BulkImport";
private Configuration conf;
public static class InnerMap extends MapReduceBase implements Mapper&lt;LongWritable, Text, Text, Text> {
private HTable table;
private HBaseConfiguration HBconf;
public void map(LongWritable key, Text value,
OutputCollector&lt;Text, Text> output, Reporter reporter)
throws IOException {
if ( table == null )
throw new IOException("table is null");
// Split input line on tab character
String [] splits = value.toString().split("\t");
if ( splits.length != 4 )
return;
String rowID = splits[0];
int timestamp = Integer.parseInt( splits[1] );
String colID = splits[2];
String cellValue = splits[3];
reporter.setStatus("Map emitting cell for row='" + rowID +
"', column='" + colID + "', time='" + timestamp + "'");
BatchUpdate bu = new BatchUpdate( rowID );
if ( timestamp > 0 )
bu.setTimestamp( timestamp );
bu.put(colID, cellValue.getBytes());
table.commit( bu );
}
public void configure(JobConf job) {
HBconf = new HBaseConfiguration(job);
try {
table = new HTable( HBconf, job.get("input.table") );
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public JobConf createSubmittableJob(String[] args) {
JobConf c = new JobConf(getConf(), BulkImport.class);
c.setJobName(NAME);
FileInputFormat.setInputPaths(c, new Path(args[0]));
c.set("input.table", args[1]);
c.setMapperClass(InnerMap.class);
c.setNumReduceTasks(0);
c.setOutputFormat(NullOutputFormat.class);
return c;
}
static int printUsage() {
System.err.println("Usage: " + NAME + " &lt;input> &lt;table_name>");
System.err.println("\twhere &lt;input> is a tab-delimited text file with 4 columns.");
System.err.println("\t\tcolumn 1 = row ID");
System.err.println("\t\tcolumn 2 = timestamp (use a negative value for current time)");
System.err.println("\t\tcolumn 3 = column ID");
System.err.println("\t\tcolumn 4 = cell value");
return -1;
}
public int run(@SuppressWarnings("unused") String[] args) throws Exception {
// Make sure there are exactly 3 parameters left.
if (args.length != 2) {
return printUsage();
}
JobClient.runJob(createSubmittableJob(args));
return 0;
}
public Configuration getConf() {
return this.conf;
}
public void setConf(final Configuration c) {
this.conf = c;
}
public static void main(String[] args) throws Exception {
int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
System.exit(errCode);
}
}
</pre></blockquote>
*/
package org.apache.hadoop.hbase.mapreduce;