diff --git a/CHANGES.txt b/CHANGES.txt index 3865c1fa97e..8866ac10092 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -199,6 +199,10 @@ Release 0.21.0 - Unreleased HBASE-1961 HBase EC2 scripts HBASE-1971 Unit test the full WAL replay cycle + NEW FEATURES + HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write + hfiles direct) uploader + OPTIMIZATIONS HBASE-410 [testing] Speed up the test suite diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java b/src/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java new file mode 100644 index 00000000000..5c7af50a711 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java @@ -0,0 +1,96 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Partitioner; + +/** + * A partitioner that takes start and end keys and uses bigdecimal to figure + * which reduce a key belongs to. Pass the start and end + * keys in the Configuration using hbase.simpletotalorder.start + * and hbase.simpletotalorder.end. The end key needs to be + * exclusive; i.e. one larger than the biggest key in your key space. + * You may be surprised at how this class partitions the space; it may not + * align with preconceptions; e.g. a start key of zero and an end key of 100 + * divided in ten will not make regions whose range is 0-10, 10-20, and so on. + * Make your own partitioner if you need the region spacing to come out a + * particular way. + * @param + * @see #START + * @see #END + */ +public class SimpleTotalOrderPartitioner extends Partitioner +implements Configurable { + private final Log LOG = LogFactory.getLog(this.getClass()); + public static final String START = "hbase.simpletotalorder.start"; + public static final String END = "hbase.simpletotalorder.end"; + private Configuration c; + private byte [] startkey; + private byte [] endkey; + private byte [][] splits; + private int lastReduces = -1; + + @Override + public int getPartition(final ImmutableBytesWritable key, final VALUE value, + final int reduces) { + if (reduces == 1) return 0; + if (this.lastReduces != reduces) { + this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1); + for (int i = 0; i < splits.length; i++) { + LOG.info(Bytes.toString(splits[i])); + } + } + int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), + key.getLength(), Bytes.BYTES_RAWCOMPARATOR); + // Below code is from hfile index search. + if (pos < 0) { + pos++; + pos *= -1; + if (pos == 0) { + // falls before the beginning of the file. + throw new RuntimeException("Key outside start/stop range: " + + key.toString()); + } + pos--; + } + return pos; + } + + @Override + public Configuration getConf() { + return this.c; + } + + @Override + public void setConf(Configuration conf) { + this.c = conf; + String startStr = this.c.get(START); + String endStr = this.c.get(END); + LOG.info("startkey=" + startStr + ", endkey=" + endStr); + this.startkey = Bytes.toBytes(startStr); + this.endkey = Bytes.toBytes(endStr); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java b/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java index 30315c3afda..2656ef7b614 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java @@ -109,8 +109,8 @@ partitioner.

Bulk import writing HFiles directly

If importing into a new table, its possible to by-pass the HBase API and write your content directly to the filesystem properly formatted as -HBase data files (HFiles). Your import will run faster, perhaps as much -as an order of magnitude faster if not more. +HBase data files (HFiles). Your import will run faster, perhaps an order of +magnitude faster if not more.

You will need to write a MapReduce job. The map task will know how to pull from your data source. Your reduce task will need to be hooked up to @@ -131,14 +131,20 @@ BUT the sort is scoped to the particular reducer. Its not a global sort. Given the default hash Partitioner, if the keys were 0-4 (inclusive), and you had configured two reducers, reducer 0 would have get keys 0, 2 and 4 whereas reducer 1 would get keys 1 and 3 (in order). For your bulk import to work, -the keys need to be orderd so reducer 0 gets keys 0-2 and reducer 1 gets keys -3-4 (See TotalOrderPartitioner up in hadoop for more on what this means). +the keys need to be ordered so reducer 0 gets keys 0-2 and reducer 1 gets keys +3-4 (See TotalOrderPartitioner up in hadoop for more on what this means. See +how it runs a sampler step first. You may need to write one of these). To achieve total ordering, you will likely need to write a Partitioner that is intimate with your tables key namespace and that knows how -to distribute keys among the reducers so a total order is maintained. +to distribute keys among the reducers so a total order is maintained. If your +keys are distributed with some regularity across a defined key space -- i.e. +you know the start and end keys -- then the {@link SimpleTotalOrderPartitioner} +may be all you need.

-

See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example that puts together -{@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer} and {@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.

+

See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example +that puts together {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer}, +{@link SimpleTotalOrderPartitioner}, and +{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.

HFileOutputFormat writes HFiles. When your MapReduce file finishes, in your output directory you will have many HFiles. Run the script bin/loadtable.rb diff --git a/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index b71158d7a5c..867a63166ef 100644 --- a/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -172,6 +173,15 @@ public class TestHFileOutputFormat extends HBaseTestCase { job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); + // This partitioner doesn't work well for number keys but using it anyways + // just to demonstrate how to configure it. + job.setPartitionerClass(SimpleTotalOrderPartitioner.class); + // Set start and end rows for partitioner. + job.getConfiguration().set(SimpleTotalOrderPartitioner.START, + Bytes.toString(PerformanceEvaluation.format(0))); + int rows = this.conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT; + job.getConfiguration().set(SimpleTotalOrderPartitioner.END, + Bytes.toString(PerformanceEvaluation.format(rows))); job.setReducerClass(KeyValueSortReducer.class); job.setOutputFormatClass(HFileOutputFormat.class); FileOutputFormat.setOutputPath(job, this.testDir); @@ -179,4 +189,4 @@ public class TestHFileOutputFormat extends HBaseTestCase { FileStatus [] files = this.fs.listStatus(this.testDir); assertTrue(files.length > 0); } -} +} \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java b/src/test/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java new file mode 100644 index 00000000000..a69600e3753 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java @@ -0,0 +1,68 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Test of simple partitioner. + */ +public class TestSimpleTotalOrderPartitioner extends HBaseTestCase { + public void testSplit() throws Exception { + String start = "a"; + String end = "{"; + SimpleTotalOrderPartitioner p = + new SimpleTotalOrderPartitioner(); + this.conf.set(SimpleTotalOrderPartitioner.START, start); + this.conf.set(SimpleTotalOrderPartitioner.END, end); + p.setConf(this.conf); + ImmutableBytesWritable c = new ImmutableBytesWritable(Bytes.toBytes("c")); + // If one reduce, partition should be 0. + int partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 1); + assertEquals(0, partition); + // If two reduces, partition should be 0. + partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(0, partition); + // Divide in 3. + partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(0, partition); + ImmutableBytesWritable q = new ImmutableBytesWritable(Bytes.toBytes("q")); + partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(1, partition); + partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(2, partition); + // What about end and start keys. + ImmutableBytesWritable startBytes = + new ImmutableBytesWritable(Bytes.toBytes(start)); + partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(0, partition); + partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(0, partition); + ImmutableBytesWritable endBytes = + new ImmutableBytesWritable(Bytes.toBytes("z")); + partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(1, partition); + partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(2, partition); + } +}