HBASE-1963 Output to multiple tables from Hadoop MR without use of HTable

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@834554 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2009-11-10 17:20:13 +00:00
parent b13b990a64
commit ca1f068c57
6 changed files with 365 additions and 11 deletions

View File

@ -76,11 +76,12 @@ Release 0.21.0 - Unreleased
Purtell)
HBASE-1916 FindBugs and javac warnings cleanup
HBASE-1908 ROOT not reassigned if only one regionserver left
HBASE-1915 HLog.sync is called way too often, needs to be only called 1x per
RPC
HBASE-1915 HLog.sync is called way too often, needs to be only called one
time per RPC
HBASE-1777 column length is not checked before saved to memstore
HBASE-1925 IllegalAccessError: Has not been initialized (getMaxSequenceId)
HBASE-1929 If hbase-default.xml is not in CP, zk session timeout is 10 secs!
HBASE-1929 If hbase-default.xml is not in CP, zk session timeout is 10
seconds!
HBASE-1927 Scanners not closed properly in certain circumstances
HBASE-1934 NullPointerException in ClientScanner (Andrew Purtell via Stack)
HBASE-1946 Unhandled exception at regionserver (Dmitriy Lyfar via Stack)
@ -88,7 +89,8 @@ Release 0.21.0 - Unreleased
(Andrew McCall via Clint Morgan and Stack)
HBASE-1953 Overhaul of overview.html (html fixes, typos, consistency) -
no content changes (Lars Francke via Stack)
HBASE-1954 Transactional scans do not see newest put (Clint Morgan via Stack)
HBASE-1954 Transactional scans do not see newest put (Clint Morgan via
Stack)
HBASE-1919 code: HRS.delete seems to ignore exceptions it shouldnt
HBASE-1951 Stack overflow when calling HTable.checkAndPut()
when deleting a lot of values
@ -101,9 +103,10 @@ Release 0.21.0 - Unreleased
not usable) if the designated regionServer dies before the
assignment is complete (Yannis Pavlidis via Stack)
HBASE-1962 Bulk loading script makes regions incorrectly (loadtable.rb)
HBASE-1966 Apply the fix from site/ to remove the forrest dependency on java5
HBASE-1967 [Transactional] client.TestTransactions.testPutPutScan fails sometimes
Temporary fix
HBASE-1966 Apply the fix from site/ to remove the forrest dependency on
Java 5
HBASE-1967 [Transactional] client.TestTransactions.testPutPutScan fails
sometimes -- Temporary fix
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
@ -164,7 +167,8 @@ Release 0.21.0 - Unreleased
HBASE-1756 Refactor HLog (changing package first)
HBASE-1926 Remove unused xmlenc jar from trunk
HBASE-1936 HLog group commit
HBASE-1921 When the Master's session times out and there's only one, cluster is wedged
HBASE-1921 When the Master's session times out and there's only one,
cluster is wedged
HBASE-1942 Update hadoop jars in trunk; update to r831142
HBASE-1943 Remove AgileJSON; unused
HBASE-1944 Add a "deferred log flush" attribute to HTD
@ -174,8 +178,10 @@ Release 0.21.0 - Unreleased
HBASE-1829 Make use of start/stop row in TableInputFormat
(Lars George via Stack)
HBASE-1867 Tool to regenerate an hbase table from the data files
HBASE-1904 Add tutorilal for installing HBase on Windows using Cygwin as
HBASE-1904 Add tutorial for installing HBase on Windows using Cygwin as
a test and development environment (Wim Van Leuven via Stack)
HBASE-1963 Output to multiple tables from Hadoop MR without use of HTable
(Kevin Peterson via Andrew Purtell)
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite

View File

@ -1 +1,11 @@
Example code. Includes thrift clients and mapreduce uploader examples.
Example code.
* src/examples/thrift
Examples for interacting with HBase via Thrift from C++, PHP, Python and Ruby.
* org.apache.hadoop.hbase.mapreduce.SampleUploader
Demonstrates uploading data from text files (presumably stored in HDFS) to HBase.
* org.apache.hadoop.hbase.mapreduce.IndexBuilder
Demonstrates map/reduce with a table as the source and other tables as the sink.
As of 0.20 there is no ant target for building the examples. You can easily build
the Java examples by copying them to the right location in the main source hierarchy.

View File

@ -0,0 +1,15 @@
# Set up sample data for IndexBuilder example
create "people", "attributes"
create "people-email", "INDEX"
create "people-phone", "INDEX"
create "people-name", "INDEX"
[["1", "jenny", "jenny@example.com", "867-5309"],
["2", "alice", "alice@example.com", "555-1234"],
["3", "kevin", "kevinpet@example.com", "555-1212"]].each do |fields|
(id, name, email, phone) = *fields
put "people", id, "attributes:name", name
put "people", id, "attributes:email", email
put "people", id, "attributes:phone", phone
end

View File

@ -0,0 +1,154 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* Example map/reduce job to construct index tables that can be used to quickly
* find a row based on the value of a column. It demonstrates:
* <ul>
* <li>Using TableInputFormat and TableMapReduceUtil to use an HTable as input
* to a map/reduce job.</li>
* <li>Passing values from main method to children via the configuration.</li>
* <li>Using MultiTableOutputFormat to output to multiple tables from a
* map/reduce job.</li>
* <li>A real use case of building a secondary index over a table.</li>
* </ul>
*
* <h3>Usage</h3>
*
* <p>
* Modify ${HADOOP_HOME}/conf/hadoop-env.sh to include the hbase jar, the
* zookeeper jar, the examples output directory, and the hbase conf directory in
* HADOOP_CLASSPATH, and then run
* <tt><strong>bin/hadoop org.apache.hadoop.hbase.mapreduce.IndexBuilder TABLE_NAME COLUMN_FAMILY ATTR [ATTR ...]</strong></tt>
* </p>
*
* <p>
* To run with the sample data provided in index-builder-setup.rb, use the
* arguments <strong><tt>people attributes name email phone</tt></strong>.
* </p>
*
* <p>
* This code was written against HBase 0.21 trunk.
* </p>
*/
public class IndexBuilder {
/** the column family containing the indexed row key */
public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");
/** the qualifier containing the indexed row key */
public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");
/**
* Internal Mapper to be run by Hadoop.
*/
public static class Map extends
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
private byte[] family;
private HashMap<byte[], ImmutableBytesWritable> indexes;
@Override
protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
throws IOException, InterruptedException {
for(java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
byte[] qualifier = index.getKey();
ImmutableBytesWritable tableName = index.getValue();
byte[] value = result.getValue(family, qualifier);
if (value != null) {
// original: row 123 attribute:phone 555-1212
// index: row 555-1212 INDEX:ROW 123
Put put = new Put(value);
put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());
context.write(tableName, put);
}
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration configuration = context.getConfiguration();
String tableName = configuration.get("index.tablename");
String[] fields = configuration.getStrings("index.fields");
String familyName = configuration.get("index.familyname");
family = Bytes.toBytes(familyName);
indexes = new HashMap<byte[], ImmutableBytesWritable>();
for(String field : fields) {
// if the table is "people" and the field to index is "email", then the
// index table will be called "people-email"
indexes.put(Bytes.toBytes(field),
new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));
}
}
}
/**
* Job configuration.
*/
public static Job configureJob(Configuration conf, String [] args)
throws IOException {
String tableName = args[0];
String columnFamily = args[1];
System.out.println("****" + tableName);
conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan()));
conf.set(TableInputFormat.INPUT_TABLE, tableName);
conf.set("index.tablename", tableName);
conf.set("index.familyname", columnFamily);
String[] fields = new String[args.length - 2];
for(int i = 0; i < fields.length; i++) {
fields[i] = args[i + 2];
}
conf.setStrings("index.fields", fields);
conf.set("index.familyname", "attributes");
Job job = new Job(conf, tableName);
job.setJarByClass(IndexBuilder.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
return job;
}
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length < 3) {
System.err.println("Only " + otherArgs.length + " arguments supplied, required: 3");
System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");
System.exit(-1);
}
Job job = configureJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

View File

@ -0,0 +1,167 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* <p>
* Hadoop output format that writes to one or more HBase tables. The key is
* taken to be the table name while the output value <em>must</em> be either a
* {@link Put} or a {@link Delete} instance. All tables must already exist, and
* all Puts and Deletes must reference only valid column families.
* </p>
*
* <p>
* Write-ahead logging (HLog) for Puts can be disabled by setting
* {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
* Note that disabling write-ahead logging is only appropriate for jobs where
* loss of data due to region server failure can be tolerated (for example,
* because it is easy to rerun a bulk import).
* </p>
*
* <p>
* See also the {@link IndexBuilder} example.
* </p>
*/
public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Writable> {
/** Set this to {@link #WAL_OFF} to turn off write-ahead logging (HLog) */
public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
/** Property value to use write-ahead logging */
public static final boolean WAL_ON = true;
/** Property value to disable write-ahead logging */
public static final boolean WAL_OFF = false;
/**
* Record writer for outputting to multiple HTables.
*/
protected static class MultiTableRecordWriter extends
RecordWriter<ImmutableBytesWritable, Writable> {
private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
Map<ImmutableBytesWritable, HTable> tables;
HBaseConfiguration config;
boolean useWriteAheadLogging;
/**
* @param config
* HBaseConfiguration to used
* @param useWriteAheadLogging
* whether to use write ahead logging. This can be turned off (
* <tt>false</tt>) to improve performance when bulk loading data.
*/
public MultiTableRecordWriter(HBaseConfiguration config,
boolean useWriteAheadLogging) {
LOG.debug("Created new MultiTableRecordReader with WAL "
+ (useWriteAheadLogging ? "on" : "off"));
this.tables = new HashMap<ImmutableBytesWritable, HTable>();
this.config = config;
this.useWriteAheadLogging = useWriteAheadLogging;
}
/**
* @param tableName
* the name of the table, as a string
* @return the named table
* @throws IOException
* if there is a problem opening a table
*/
HTable getTable(ImmutableBytesWritable tableName) throws IOException {
if (!tables.containsKey(tableName)) {
LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
HTable table = new HTable(config, tableName.get());
table.setAutoFlush(false);
tables.put(tableName, table);
}
return tables.get(tableName);
}
@Override
public void close(TaskAttemptContext context) throws IOException {
for (HTable table : tables.values()) {
table.flushCommits();
}
}
/**
* Writes an action (Put or Delete) to the specified table.
*
* @param tableName
* the table being updated.
* @param action
* the update, either a put or a delete.
* @throws IllegalArgumentException
* if the action is not a put or a delete.
*/
@Override
public void write(ImmutableBytesWritable tableName, Writable action) throws IOException {
HTable table = getTable(tableName);
// The actions are not immutable, so we defensively copy them
if (action instanceof Put) {
Put put = new Put((Put) action);
put.setWriteToWAL(useWriteAheadLogging);
table.put(put);
} else if (action instanceof Delete) {
Delete delete = new Delete((Delete) action);
table.delete(delete);
} else
throw new IllegalArgumentException(
"action must be either Delete or Put");
}
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
// we can't know ahead of time if it's going to blow up when the user
// passes a table name that doesn't exist, so nothing useful here.
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new TableOutputCommitter();
}
@Override
public RecordWriter<ImmutableBytesWritable, Writable> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
return new MultiTableRecordWriter(new HBaseConfiguration(configuration),
configuration.getBoolean(WAL_PROPERTY, WAL_ON));
}
}

View File

@ -67,7 +67,9 @@ directory and the hbase conf into the job jars top-level directory.
<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat},
and data sink, {@link org.apache.hadoop.hbase.mapreduce.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
and data sink, {@link org.apache.hadoop.hbase.mapreduce.TableOutputFormat TableOutputFormat}
or {@link org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat MultiTableOutputFormat},
for MapReduce jobs.
Writing MapReduce jobs that read or write HBase, you'll probably want to subclass
{@link org.apache.hadoop.hbase.mapreduce.TableMapper TableMapper} and/or
{@link org.apache.hadoop.hbase.mapreduce.TableReducer TableReducer}. See the do-nothing