From b5da2444121cd5ebb649b1d52e018b440c3e6001 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 24 Apr 2009 22:43:03 +0000 Subject: [PATCH] HBASE-1287 Partitioner class not used in TableMapReduceUtil.initTableReduceJob() git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@768431 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hadoop/hbase/mapred/RowCounter.java | 3 + .../hbase/mapred/TableMapReduceUtil.java | 104 ++++++++++++++---- 3 files changed, 90 insertions(+), 19 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3ba77cbb19d..7e244017094 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -87,6 +87,8 @@ Release 0.20.0 - Unreleased HBASE-1292 php thrift's getRow() would throw an exception if the row does not exist (Rong-en Fan via Stack) HBASE-1340 Fix new javadoc warnings (Evgeny Ryabitskiy via Stack) + HBASE-1287 Partitioner class not used in TableMapReduceUtil.initTableReduceJob() + (Lars George and Billy Pearson via Stack) IMPROVEMENTS HBASE-1089 Add count of regions on filesystem to master UI; add percentage diff --git a/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java b/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java index 1550ffa36c7..3ddfad469b4 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java +++ b/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java @@ -46,6 +46,9 @@ public class RowCounter extends Configured implements Tool { // Name of this 'program' static final String NAME = "rowcounter"; + /** + * Mapper that runs the count. + */ static class RowCounterMapper implements TableMap { private static enum Counters {ROWS} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index 866a20f8a0c..390c651edb4 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -35,16 +35,17 @@ import org.apache.hadoop.mapred.JobConf; */ @SuppressWarnings("unchecked") public class TableMapReduceUtil { + /** * Use this before submitting a TableMap job. It will * appropriately set up the JobConf. * - * @param table table name - * @param columns columns to scan - * @param mapper mapper class - * @param outputKeyClass - * @param outputValueClass - * @param job job configuration + * @param table The table name to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job configuration to adjust. */ public static void initTableMapJob(String table, String columns, Class mapper, @@ -63,10 +64,10 @@ public class TableMapReduceUtil { * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * - * @param table - * @param reducer - * @param job - * @throws IOException + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job configuration to adjust. + * @throws IOException When determining the region count fails. */ public static void initTableReduceJob(String table, Class reducer, JobConf job) @@ -78,12 +79,12 @@ public class TableMapReduceUtil { * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * - * @param table - * @param reducer - * @param job - * @param partitioner Partitioner to use. Pass null to use default - * partitioner. - * @throws IOException + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job configuration to adjust. + * @param partitioner Partitioner to use. Pass null to use + * default partitioner. + * @throws IOException When determining the region count fails. */ public static void initTableReduceJob(String table, Class reducer, JobConf job, Class partitioner) @@ -93,13 +94,78 @@ public class TableMapReduceUtil { job.set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(BatchUpdate.class); - if (partitioner != null) { + if (partitioner == HRegionPartitioner.class) { job.setPartitionerClass(HRegionPartitioner.class); HTable outputTable = new HTable(new HBaseConfiguration(job), table); int regions = outputTable.getRegionsInfo().size(); - if (job.getNumReduceTasks() > regions){ - job.setNumReduceTasks(outputTable.getRegionsInfo().size()); + if (job.getNumReduceTasks() > regions) { + job.setNumReduceTasks(outputTable.getRegionsInfo().size()); } + } else if (partitioner != null) { + job.setPartitionerClass(partitioner); } } + + /** + * Ensures that the given number of reduce tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public void limitNumReduceTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumReduceTasks() > regions) + job.setNumReduceTasks(regions); + } + + /** + * Ensures that the given number of map tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public void limitNumMapTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumMapTasks() > regions) + job.setNumMapTasks(regions); + } + + /** + * Sets the number of reduce tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public void setNumReduceTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + job.setNumReduceTasks(regions); + } + + /** + * Sets the number of map tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public void setNumMapTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + job.setNumMapTasks(regions); + } + } \ No newline at end of file