HBASE-1901 General partitioner for hbase-48 bulk (behind the api, write hfiles direct) uploader
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@883833 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
866453467e
commit
7486a7efe2
|
@ -199,6 +199,10 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-1961 HBase EC2 scripts
|
||||
HBASE-1971 Unit test the full WAL replay cycle
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write
|
||||
hfiles direct) uploader
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-410 [testing] Speed up the test suite
|
||||
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
|
||||
/**
|
||||
* A partitioner that takes start and end keys and uses bigdecimal to figure
|
||||
* which reduce a key belongs to. Pass the start and end
|
||||
* keys in the Configuration using <code>hbase.simpletotalorder.start</code>
|
||||
* and <code>hbase.simpletotalorder.end</code>. The end key needs to be
|
||||
* exclusive; i.e. one larger than the biggest key in your key space.
|
||||
* You may be surprised at how this class partitions the space; it may not
|
||||
* align with preconceptions; e.g. a start key of zero and an end key of 100
|
||||
* divided in ten will not make regions whose range is 0-10, 10-20, and so on.
|
||||
* Make your own partitioner if you need the region spacing to come out a
|
||||
* particular way.
|
||||
* @param <VALUE>
|
||||
* @see #START
|
||||
* @see #END
|
||||
*/
|
||||
public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
|
||||
implements Configurable {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||
public static final String START = "hbase.simpletotalorder.start";
|
||||
public static final String END = "hbase.simpletotalorder.end";
|
||||
private Configuration c;
|
||||
private byte [] startkey;
|
||||
private byte [] endkey;
|
||||
private byte [][] splits;
|
||||
private int lastReduces = -1;
|
||||
|
||||
@Override
|
||||
public int getPartition(final ImmutableBytesWritable key, final VALUE value,
|
||||
final int reduces) {
|
||||
if (reduces == 1) return 0;
|
||||
if (this.lastReduces != reduces) {
|
||||
this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
LOG.info(Bytes.toString(splits[i]));
|
||||
}
|
||||
}
|
||||
int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
|
||||
key.getLength(), Bytes.BYTES_RAWCOMPARATOR);
|
||||
// Below code is from hfile index search.
|
||||
if (pos < 0) {
|
||||
pos++;
|
||||
pos *= -1;
|
||||
if (pos == 0) {
|
||||
// falls before the beginning of the file.
|
||||
throw new RuntimeException("Key outside start/stop range: " +
|
||||
key.toString());
|
||||
}
|
||||
pos--;
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return this.c;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.c = conf;
|
||||
String startStr = this.c.get(START);
|
||||
String endStr = this.c.get(END);
|
||||
LOG.info("startkey=" + startStr + ", endkey=" + endStr);
|
||||
this.startkey = Bytes.toBytes(startStr);
|
||||
this.endkey = Bytes.toBytes(endStr);
|
||||
}
|
||||
}
|
|
@ -109,8 +109,8 @@ partitioner.
|
|||
<h2><a name="bulk">Bulk import writing HFiles directly</a></h2>
|
||||
<p>If importing into a new table, its possible to by-pass the HBase API
|
||||
and write your content directly to the filesystem properly formatted as
|
||||
HBase data files (HFiles). Your import will run faster, perhaps as much
|
||||
as an order of magnitude faster if not more.
|
||||
HBase data files (HFiles). Your import will run faster, perhaps an order of
|
||||
magnitude faster if not more.
|
||||
</p>
|
||||
<p>You will need to write a MapReduce job. The map task will know how to
|
||||
pull from your data source. Your reduce task will need to be hooked up to
|
||||
|
@ -131,14 +131,20 @@ BUT the sort is scoped to the particular reducer. Its not a global sort.
|
|||
Given the default hash Partitioner, if the keys were 0-4 (inclusive), and you
|
||||
had configured two reducers, reducer 0 would have get keys 0, 2 and 4 whereas
|
||||
reducer 1 would get keys 1 and 3 (in order). For your bulk import to work,
|
||||
the keys need to be orderd so reducer 0 gets keys 0-2 and reducer 1 gets keys
|
||||
3-4 (See TotalOrderPartitioner up in hadoop for more on what this means).
|
||||
the keys need to be ordered so reducer 0 gets keys 0-2 and reducer 1 gets keys
|
||||
3-4 (See TotalOrderPartitioner up in hadoop for more on what this means. See
|
||||
how it runs a sampler step first. You may need to write one of these).
|
||||
To achieve total ordering, you will likely need to write a Partitioner
|
||||
that is intimate with your tables key namespace and that knows how
|
||||
to distribute keys among the reducers so a total order is maintained.
|
||||
to distribute keys among the reducers so a total order is maintained. If your
|
||||
keys are distributed with some regularity across a defined key space -- i.e.
|
||||
you know the start and end keys -- then the {@link SimpleTotalOrderPartitioner}
|
||||
may be all you need.
|
||||
</p>
|
||||
<p>See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example that puts together
|
||||
{@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer} and {@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.</p>
|
||||
<p>See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example
|
||||
that puts together {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer},
|
||||
{@link SimpleTotalOrderPartitioner}, and
|
||||
{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.</p>
|
||||
|
||||
<p>HFileOutputFormat writes HFiles. When your MapReduce file finishes, in your
|
||||
output directory you will have many HFiles. Run the script <code>bin/loadtable.rb</code>
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PerformanceEvaluation;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
|
@ -172,6 +173,15 @@ public class TestHFileOutputFormat extends HBaseTestCase {
|
|||
job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class);
|
||||
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||
job.setMapOutputValueClass(KeyValue.class);
|
||||
// This partitioner doesn't work well for number keys but using it anyways
|
||||
// just to demonstrate how to configure it.
|
||||
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
|
||||
// Set start and end rows for partitioner.
|
||||
job.getConfiguration().set(SimpleTotalOrderPartitioner.START,
|
||||
Bytes.toString(PerformanceEvaluation.format(0)));
|
||||
int rows = this.conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
|
||||
job.getConfiguration().set(SimpleTotalOrderPartitioner.END,
|
||||
Bytes.toString(PerformanceEvaluation.format(rows)));
|
||||
job.setReducerClass(KeyValueSortReducer.class);
|
||||
job.setOutputFormatClass(HFileOutputFormat.class);
|
||||
FileOutputFormat.setOutputPath(job, this.testDir);
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Test of simple partitioner.
|
||||
*/
|
||||
public class TestSimpleTotalOrderPartitioner extends HBaseTestCase {
|
||||
public void testSplit() throws Exception {
|
||||
String start = "a";
|
||||
String end = "{";
|
||||
SimpleTotalOrderPartitioner<byte []> p =
|
||||
new SimpleTotalOrderPartitioner<byte []>();
|
||||
this.conf.set(SimpleTotalOrderPartitioner.START, start);
|
||||
this.conf.set(SimpleTotalOrderPartitioner.END, end);
|
||||
p.setConf(this.conf);
|
||||
ImmutableBytesWritable c = new ImmutableBytesWritable(Bytes.toBytes("c"));
|
||||
// If one reduce, partition should be 0.
|
||||
int partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 1);
|
||||
assertEquals(0, partition);
|
||||
// If two reduces, partition should be 0.
|
||||
partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 2);
|
||||
assertEquals(0, partition);
|
||||
// Divide in 3.
|
||||
partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 3);
|
||||
assertEquals(0, partition);
|
||||
ImmutableBytesWritable q = new ImmutableBytesWritable(Bytes.toBytes("q"));
|
||||
partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 2);
|
||||
assertEquals(1, partition);
|
||||
partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 3);
|
||||
assertEquals(2, partition);
|
||||
// What about end and start keys.
|
||||
ImmutableBytesWritable startBytes =
|
||||
new ImmutableBytesWritable(Bytes.toBytes(start));
|
||||
partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 2);
|
||||
assertEquals(0, partition);
|
||||
partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 3);
|
||||
assertEquals(0, partition);
|
||||
ImmutableBytesWritable endBytes =
|
||||
new ImmutableBytesWritable(Bytes.toBytes("z"));
|
||||
partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 2);
|
||||
assertEquals(1, partition);
|
||||
partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 3);
|
||||
assertEquals(2, partition);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue