HBASE-7747 Import tools should use a combiner to merge Puts (Nick Dimiduk)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1450580 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-02-27 02:16:13 +00:00
parent 4d356e24e1
commit 80e3bf8675
5 changed files with 92 additions and 3 deletions

View File

@ -18,8 +18,6 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.util.Base64;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -37,6 +35,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@ -283,6 +282,7 @@ public class ImportTsv {
FileOutputFormat.setOutputPath(job, outputDir); FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class); job.setMapOutputValueClass(Put.class);
job.setCombinerClass(PutCombiner.class);
HFileOutputFormat.configureIncrementalLoad(job, table); HFileOutputFormat.configureIncrementalLoad(job, table);
} else { } else {
// No reducers. Just write straight to table. Call initTableReducerJob // No reducers. Just write straight to table. Call initTableReducerJob

View File

@ -0,0 +1,72 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Combine Puts. Merges Put instances grouped by <code>K</code> into a single
* instance.
* @see TableMapReduceUtil
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
private static final Log LOG = LogFactory.getLog(PutCombiner.class);
@Override
protected void reduce(K row, Iterable<Put> vals, Context context)
throws IOException, InterruptedException {
int cnt = 0;
// There's nothing to say <code>K row</code> is the same as the rowkey
// used to construct Puts (value) instances. Thus the map of put.getRow()
// to combined Put is necessary.
// TODO: would be better if we knew <code>K row</code> and Put rowkey were
// identical. Then this whole Put buffering business goes away.
// TODO: Could use HeapSize to create an upper bound on the memory size of
// the puts map and flush some portion of the content while looping. This
// flush could result in multiple Puts for a single rowkey. That is
// acceptable because Combiner is run as an optimization and it's not
// critical that all Puts are grouped perfectly.
Map<byte[], Put> puts = new HashMap<byte[], Put>();
for (Put p : vals) {
cnt++;
if (!puts.containsKey(p.getRow())) {
puts.put(p.getRow(), p);
} else {
puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap());
}
}
for (Put p : puts.values()) {
context.write(row, p);
}
LOG.info(String.format("Combined %d Put(s) into %d.", cnt, puts.size()));
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -133,6 +134,9 @@ public class TableMapReduceUtil {
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper); job.setMapperClass(mapper);
if (Put.class.equals(outputValueClass)) {
job.setCombinerClass(PutCombiner.class);
}
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableInputFormat.INPUT_TABLE, table); conf.set(TableInputFormat.INPUT_TABLE, table);

View File

@ -289,6 +289,8 @@ public class TestImportTsv {
LOG.info("set the hbaseAdmin"); LOG.info("set the hbaseAdmin");
ImportTsv.createHbaseAdmin(conf); ImportTsv.createHbaseAdmin(conf);
} }
// force use of combiner for testing purposes
conf.setInt("min.num.spills.for.combine", 1);
Job job = ImportTsv.createSubmittableJob(conf, args); Job job = ImportTsv.createSubmittableJob(conf, args);
job.waitForCompletion(false); job.waitForCompletion(false);
assertTrue(job.isSuccessful()); assertTrue(job.isSuccessful());

View File

@ -29,7 +29,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -128,6 +130,15 @@ public class TestTableMapReduce {
MULTI_REGION_TABLE_NAME)); MULTI_REGION_TABLE_NAME));
} }
@Test
public void testCombiner()
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration(UTIL.getConfiguration());
// force use of combiner for testing purposes
conf.setInt("min.num.spills.for.combine", 1);
runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
}
private void runTestOnTable(HTable table) private void runTestOnTable(HTable table)
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
Job job = null; Job job = null;