From 80e3bf8675eb042fa60d2b70d3477374e7d057b1 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 27 Feb 2013 02:16:13 +0000 Subject: [PATCH] HBASE-7747 Import tools should use a combiner to merge Puts (Nick Dimiduk) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1450580 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/mapreduce/ImportTsv.java | 4 +- .../hadoop/hbase/mapreduce/PutCombiner.java | 72 +++++++++++++++++++ .../hbase/mapreduce/TableMapReduceUtil.java | 4 ++ .../hadoop/hbase/mapreduce/TestImportTsv.java | 2 + .../hbase/mapreduce/TestTableMapReduce.java | 13 +++- 5 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 9efd5b61bb9..63ec90296db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.hbase.util.Base64; - import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -37,6 +35,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -283,6 +282,7 @@ public class ImportTsv { FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); + job.setCombinerClass(PutCombiner.class); HFileOutputFormat.configureIncrementalLoad(job, table); } else { // No reducers. Just write straight to table. Call initTableReducerJob diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java new file mode 100644 index 00000000000..3dd8696dbc7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Combine Puts. Merges Put instances grouped by K into a single + * instance. + * @see TableMapReduceUtil + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class PutCombiner extends Reducer { + private static final Log LOG = LogFactory.getLog(PutCombiner.class); + + @Override + protected void reduce(K row, Iterable vals, Context context) + throws IOException, InterruptedException { + + int cnt = 0; + // There's nothing to say K row is the same as the rowkey + // used to construct Puts (value) instances. Thus the map of put.getRow() + // to combined Put is necessary. + // TODO: would be better if we knew K row and Put rowkey were + // identical. Then this whole Put buffering business goes away. + // TODO: Could use HeapSize to create an upper bound on the memory size of + // the puts map and flush some portion of the content while looping. This + // flush could result in multiple Puts for a single rowkey. That is + // acceptable because Combiner is run as an optimization and it's not + // critical that all Puts are grouped perfectly. + Map puts = new HashMap(); + for (Put p : vals) { + cnt++; + if (!puts.containsKey(p.getRow())) { + puts.put(p.getRow(), p); + } else { + puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap()); + } + } + + for (Put p : puts.values()) { + context.write(row, p); + } + LOG.info(String.format("Combined %d Put(s) into %d.", cnt, puts.size())); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 3fb952d2bcf..507caecf28a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -133,6 +134,9 @@ public class TableMapReduceUtil { if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); + if (Put.class.equals(outputValueClass)) { + job.setCombinerClass(PutCombiner.class); + } Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); conf.set(TableInputFormat.INPUT_TABLE, table); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index a161ff3a91f..617634ad66f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -289,6 +289,8 @@ public class TestImportTsv { LOG.info("set the hbaseAdmin"); ImportTsv.createHbaseAdmin(conf); } + // force use of combiner for testing purposes + conf.setInt("min.num.spills.for.combine", 1); Job job = ImportTsv.createSubmittableJob(conf, args); job.waitForCompletion(false); assertTrue(job.isSuccessful()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java index 73b99000d2a..0803011fc37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -29,7 +29,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -128,6 +130,15 @@ public class TestTableMapReduce { MULTI_REGION_TABLE_NAME)); } + @Test + public void testCombiner() + throws IOException, InterruptedException, ClassNotFoundException { + Configuration conf = new Configuration(UTIL.getConfiguration()); + // force use of combiner for testing purposes + conf.setInt("min.num.spills.for.combine", 1); + runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME)); + } + private void runTestOnTable(HTable table) throws IOException, InterruptedException, ClassNotFoundException { Job job = null;