HBASE-6706 Remove TotalOrderPartitioner

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1381881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-09-07 05:28:31 +00:00
parent 45f309beff
commit 773650bb0c
13 changed files with 178 additions and 910 deletions

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
/**
* A compatibility shim layer for interacting with different versions of Hadoop.
*/
//NOTE: we can move this under src/main if main code wants to use this shim layer
public interface HadoopShims {
/**
* Returns a TaskAttemptContext instance created from the given parameters.
* @param job an instance of o.a.h.mapreduce.Job
* @param taskId an identifier for the task attempt id. Should be parsable by
* TaskAttemptId.forName()
* @return a concrete TaskAttemptContext instance of o.a.h.mapreduce.TaskAttemptContext
*/
public <T,J> T createTestTaskAttemptContext(final J job, final String taskId);
}

View File

@ -58,6 +58,12 @@ limitations under the License.
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
* Compatibility shim layer implementation for Hadoop-1.
*/
public class HadoopShimsImpl implements HadoopShims {
/**
* Returns a TaskAttemptContext instance created from the given parameters.
* @param job an instance of o.a.h.mapreduce.Job
* @param taskId an identifier for the task attempt id. Should be parsable by
* TaskAttemptId.forName()
* @return a concrete TaskAttemptContext instance of o.a.h.mapreduce.TaskAttemptContext
*/
@Override
@SuppressWarnings("unchecked")
public <T, J> T createTestTaskAttemptContext(J job, String taskId) {
Job j = (Job)job;
return (T)new TaskAttemptContext(j.getConfiguration(), TaskAttemptID.forName(taskId));
}
}

View File

@ -0,0 +1 @@
org.apache.hadoop.hbase.HadoopShimsImpl

View File

@ -112,6 +112,12 @@ limitations under the License.
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
/**
* Compatibility shim layer implementation for Hadoop-2.
*/
public class HadoopShimsImpl implements HadoopShims {
/**
* Returns a TaskAttemptContext instance created from the given parameters.
* @param job an instance of o.a.h.mapreduce.Job
* @param taskId an identifier for the task attempt id. Should be parsable by
* TaskAttemptId.forName()
* @return a concrete TaskAttemptContext instance of o.a.h.mapreduce.TaskAttemptContext
*/
@Override
@SuppressWarnings("unchecked")
public <T, J> T createTestTaskAttemptContext(J job, String taskId) {
Job j = (Job)job;
return (T)new TaskAttemptContextImpl(j.getConfiguration(), TaskAttemptID.forName(taskId));
}
}

View File

@ -0,0 +1 @@
org.apache.hadoop.hbase.HadoopShimsImpl

View File

@ -266,10 +266,21 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>${compat.module}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>${compat.module}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- General dependencies -->
<dependency>

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
@ -62,11 +61,11 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
/**
* Writes HFiles. Passed KeyValues must arrive in order.
@ -317,13 +316,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
public static void configureIncrementalLoad(Job job, HTable table)
throws IOException {
Configuration conf = job.getConfiguration();
Class<? extends Partitioner> topClass;
try {
topClass = getTotalOrderPartitionerClass();
} catch (ClassNotFoundException e) {
throw new IOException("Failed getting TotalOrderPartitioner", e);
}
job.setPartitionerClass(topClass);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat.class);
@ -355,11 +349,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
URI cacheUri;
try {
// Below we make explicit reference to the bundled TOP. Its cheating.
// We are assume the define in the hbase bundled TOP is as it is in
// hadoop (whether 0.20 or 0.22, etc.)
cacheUri = new URI(partitionsPath.toString() + "#" +
org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH);
} catch (URISyntaxException e) {
throw new IOException(e);
}
@ -373,27 +363,6 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
LOG.info("Incremental table output configured.");
}
/**
* If > hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner.
* If 0.20, then we want to use the TOP that we have under hadoopbackport.
* This method is about hbase being able to run on different versions of
* hadoop. In 0.20.x hadoops, we have to use the TOP that is bundled with
* hbase. Otherwise, we use the one in Hadoop.
* @return Instance of the TotalOrderPartitioner class
* @throws ClassNotFoundException If can't find a TotalOrderPartitioner.
*/
private static Class<? extends Partitioner> getTotalOrderPartitionerClass()
throws ClassNotFoundException {
Class<? extends Partitioner> clazz = null;
try {
clazz = (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner");
} catch (ClassNotFoundException e) {
clazz =
(Class<? extends Partitioner>) Class.forName("org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner");
}
return clazz;
}
/**
* Run inside the task to deserialize column family to compression algorithm
* map from the

View File

@ -1,443 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Utility for collecting samples and writing a partition file for
* {@link TotalOrderPartitioner}.
*
* This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
* from Hadoop trunk at r961542, with the exception of replacing
* TaskAttemptContextImpl with TaskAttemptContext.
*/
public class InputSampler<K,V> extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(InputSampler.class);
static int printUsage() {
System.out.println("sampler -r <reduces>\n" +
" [-inFormat <input format class>]\n" +
" [-keyClass <map input & output key class>]\n" +
" [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
" // Sample from random splits at random (general)\n" +
" -splitSample <numSamples> <maxsplits> | " +
" // Sample from first records in splits (random data)\n"+
" -splitInterval <double pcnt> <maxsplits>]" +
" // Sample from splits at intervals (sorted data)");
System.out.println("Default sampler: -splitRandom 0.1 10000 10");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
public InputSampler(Configuration conf) {
setConf(conf);
}
/**
* Interface to sample using an
* {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
public interface Sampler<K,V> {
/**
* For a given job, collect and return a subset of the keys from the
* input data.
*/
K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException;
}
/**
* Samples the first n records from s splits.
* Inexpensive way to sample random data.
*/
public static class SplitSampler<K,V> implements Sampler<K,V> {
private final int numSamples;
private final int maxSplitsSampled;
/**
* Create a SplitSampler sampling <em>all</em> splits.
* Takes the first numSamples / numSplits records from each split.
* @param numSamples Total number of samples to obtain from all selected
* splits.
*/
public SplitSampler(int numSamples) {
this(numSamples, Integer.MAX_VALUE);
}
/**
* Create a new SplitSampler.
* @param numSamples Total number of samples to obtain from all selected
* splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
public SplitSampler(int numSamples, int maxSplitsSampled) {
this.numSamples = numSamples;
this.maxSplitsSampled = maxSplitsSampled;
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = getTaskAttemptContext(job);
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
/**
* This method is about making hbase portable, making it so it can run on
* more than just hadoop 0.20. In later hadoops, TaskAttemptContext became
* an Interface. But in hadoops where TAC is an Interface, we shouldn't
* be using the classes that are in this package; we should be using the
* native Hadoop ones (We'll throw a ClassNotFoundException if end up in
* here when we should be using native hadoop TotalOrderPartitioner).
* @param job
* @return Context
* @throws IOException
*/
public static TaskAttemptContext getTaskAttemptContext(final Job job)
throws IOException {
Constructor<TaskAttemptContext> c;
try {
c = TaskAttemptContext.class.getConstructor(Configuration.class, TaskAttemptID.class);
} catch (Exception e) {
throw new IOException("Failed getting constructor", e);
}
try {
return c.newInstance(job.getConfiguration(), new TaskAttemptID());
} catch (Exception e) {
throw new IOException("Failed creating instance", e);
}
}
/**
* Sample from random points in the input.
* General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
* each split.
*/
public static class RandomSampler<K,V> implements Sampler<K,V> {
private double freq;
private final int numSamples;
private final int maxSplitsSampled;
/**
* Create a new RandomSampler sampling <em>all</em> splits.
* This will read every split at the client, which is very expensive.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* splits.
*/
public RandomSampler(double freq, int numSamples) {
this(freq, numSamples, Integer.MAX_VALUE);
}
/**
* Create a new RandomSampler.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
this.freq = freq;
this.numSamples = numSamples;
this.maxSplitsSampled = maxSplitsSampled;
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.size(); ++i) {
InputSplit tmp = splits.get(i);
int j = r.nextInt(splits.size());
splits.set(i, splits.get(j));
splits.set(j, tmp);
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.size() && samples.size() < numSamples); ++i) {
TaskAttemptContext samplingContext = getTaskAttemptContext(job);
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
}
freq *= (numSamples - 1) / (double) numSamples;
}
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
/**
* Sample from s splits at regular intervals.
* Useful for sorted data.
*/
public static class IntervalSampler<K,V> implements Sampler<K,V> {
private final double freq;
private final int maxSplitsSampled;
/**
* Create a new IntervalSampler sampling <em>all</em> splits.
* @param freq The frequency with which records will be emitted.
*/
public IntervalSampler(double freq) {
this(freq, Integer.MAX_VALUE);
}
/**
* Create a new IntervalSampler.
* @param freq The frequency with which records will be emitted.
* @param maxSplitsSampled The maximum number of splits to examine.
* @see #getSample
*/
public IntervalSampler(double freq, int maxSplitsSampled) {
this.freq = freq;
this.maxSplitsSampled = maxSplitsSampled;
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = getTaskAttemptContext(job);
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
++records;
if ((double) kept / records < freq) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++kept;
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
/**
* Write a partition file for the given job, using the Sampler provided.
* Queries the sampler for a sample keyset, sorts by the output key
* comparator, selects the keys for each rank, and writes to the destination
* returned from {@link TotalOrderPartitioner#getPartitionFile}.
*/
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = job.getConfiguration();
final InputFormat inf =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) {
fs.delete(dst, false);
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
int last = -1;
for(int i = 1; i < numPartitions; ++i) {
int k = Math.round(stepSize * i);
while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
/**
* Driver for InputSampler from the command line.
* Configures a JobConf instance and calls {@link #writePartitionFile}.
*/
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
ArrayList<String> otherArgs = new ArrayList<String>();
Sampler<K,V> sampler = null;
for(int i=0; i < args.length; ++i) {
try {
if ("-r".equals(args[i])) {
job.setNumReduceTasks(Integer.parseInt(args[++i]));
} else if ("-inFormat".equals(args[i])) {
job.setInputFormatClass(
Class.forName(args[++i]).asSubclass(InputFormat.class));
} else if ("-keyClass".equals(args[i])) {
job.setMapOutputKeyClass(
Class.forName(args[++i]).asSubclass(WritableComparable.class));
} else if ("-splitSample".equals(args[i])) {
int numSamples = Integer.parseInt(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new SplitSampler<K,V>(numSamples, maxSplits);
} else if ("-splitRandom".equals(args[i])) {
double pcnt = Double.parseDouble(args[++i]);
int numSamples = Integer.parseInt(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
} else if ("-splitInterval".equals(args[i])) {
double pcnt = Double.parseDouble(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
} else {
otherArgs.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
if (job.getNumReduceTasks() <= 1) {
System.err.println("Sampler requires more than one reducer");
return printUsage();
}
if (otherArgs.size() < 2) {
System.out.println("ERROR: Wrong number of parameters: ");
return printUsage();
}
if (null == sampler) {
sampler = new RandomSampler<K,V>(0.1, 10000, 10);
}
Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
TotalOrderPartitioner.setPartitionFile(getConf(), outf);
for (String s : otherArgs) {
FileInputFormat.addInputPath(job, new Path(s));
}
InputSampler.<K,V>writePartitionFile(job, sampler);
return 0;
}
public static void main(String[] args) throws Exception {
InputSampler<?,?> sampler = new InputSampler(new Configuration());
int res = ToolRunner.run(sampler, args);
System.exit(res);
}
}

View File

@ -1,401 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Partitioner effecting a total order by reading split points from
* an externally generated source.
*
* This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
* from Hadoop trunk at r910774.
*/
public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
extends Partitioner<K,V> implements Configurable {
private Node partitions;
public static final String DEFAULT_PATH = "_partition.lst";
public static final String PARTITIONER_PATH =
"mapreduce.totalorderpartitioner.path";
public static final String MAX_TRIE_DEPTH =
"mapreduce.totalorderpartitioner.trie.maxdepth";
public static final String NATURAL_ORDER =
"mapreduce.totalorderpartitioner.naturalorder";
Configuration conf;
public TotalOrderPartitioner() { }
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void setConf(Configuration conf) {
try {
this.conf = conf;
String parts = getPartitionFile(conf);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(conf) // assume in DistributedCache
: partFile.getFileSystem(conf);
Job job = new Job(conf);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset:"
+ splitPoints.length);
}
RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
conf.getBoolean(NATURAL_ORDER, true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
// Now that blocks of identical splitless trie nodes are
// represented reentrantly, and we develop a leaf for any trie
// node with only one split point, the only reason for a depth
// limit is to refute stack overflow or bloat in the pathological
// case where the split points are long and mostly look like bytes
// iii...iixii...iii . Therefore, we make the default depth
// limit large but not huge.
conf.getInt(MAX_TRIE_DEPTH, 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
public Configuration getConf() {
return conf;
}
// by construction, we know if our keytype
@SuppressWarnings("unchecked") // is memcmp-able and uses the trie
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(key);
}
/**
* Set the path to the SequenceFile storing the sorted partition keyset.
* It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
* keys in the SequenceFile.
*/
public static void setPartitionFile(Configuration conf, Path p) {
conf.set(PARTITIONER_PATH, p.toString());
}
/**
* Get the path to the SequenceFile storing the sorted partition keyset.
* @see #setPartitionFile(Configuration, Path)
*/
public static String getPartitionFile(Configuration conf) {
return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
}
/**
* Interface to the partitioner to locate a key in the partition keyset.
*/
interface Node<T> {
/**
* Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
* with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
*/
int findPartition(T key);
}
/**
* Base class for trie nodes. If the keytype is memcomp-able, this builds
* tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
* bytes.
*/
static abstract class TrieNode implements Node<BinaryComparable> {
private final int level;
TrieNode(int level) {
this.level = level;
}
int getLevel() {
return level;
}
}
/**
* For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
* where disabled by <tt>total.order.partitioner.natural.order</tt>,
* search the partition keyset with a binary search.
*/
class BinarySearchNode implements Node<K> {
private final K[] splitPoints;
private final RawComparator<K> comparator;
BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
this.splitPoints = splitPoints;
this.comparator = comparator;
}
public int findPartition(K key) {
final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
return (pos < 0) ? -pos : pos;
}
}
/**
* An inner trie node that contains 256 children based on the next
* character.
*/
class InnerTrieNode extends TrieNode {
private TrieNode[] child = new TrieNode[256];
InnerTrieNode(int level) {
super(level);
}
public int findPartition(BinaryComparable key) {
int level = getLevel();
if (key.getLength() <= level) {
return child[0].findPartition(key);
}
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
}
/**
* @param level the tree depth at this node
* @param splitPoints the full split point vector, which holds
* the split point or points this leaf node
* should contain
* @param lower first INcluded element of splitPoints
* @param upper first EXcluded element of splitPoints
* @return a leaf node. They come in three kinds: no split points
* [and the findParttion returns a canned index], one split
* point [and we compare with a single comparand], or more
* than one [and we do a binary search]. The last case is
* rare.
*/
private TrieNode LeafTrieNodeFactory
(int level, BinaryComparable[] splitPoints, int lower, int upper) {
switch (upper - lower) {
case 0:
return new UnsplitTrieNode(level, lower);
case 1:
return new SinglySplitTrieNode(level, splitPoints, lower);
default:
return new LeafTrieNode(level, splitPoints, lower, upper);
}
}
/**
* A leaf trie node that scans for the key between lower..upper.
*
* We don't generate many of these now, since we usually continue trie-ing
* when more than one split point remains at this level. and we make different
* objects for nodes with 0 or 1 split point.
*/
private class LeafTrieNode extends TrieNode {
final int lower;
final int upper;
final BinaryComparable[] splitPoints;
LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
super(level);
this.lower = lower;
this.upper = upper;
this.splitPoints = splitPoints;
}
public int findPartition(BinaryComparable key) {
final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
return (pos < 0) ? -pos : pos;
}
}
private class UnsplitTrieNode extends TrieNode {
final int result;
UnsplitTrieNode(int level, int value) {
super(level);
this.result = value;
}
public int findPartition(BinaryComparable key) {
return result;
}
}
private class SinglySplitTrieNode extends TrieNode {
final int lower;
final BinaryComparable mySplitPoint;
SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
super(level);
this.lower = lower;
this.mySplitPoint = splitPoints[lower];
}
public int findPartition(BinaryComparable key) {
return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
}
}
/**
* Read the cut points from the given IFile.
* @param fs The file system
* @param p The path to read
* @param keyClass The map output key class
* @param job The job config
* @throws IOException
*/
// matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
Configuration conf) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
while (reader.next(key, value)) {
parts.add(key);
key = ReflectionUtils.newInstance(keyClass, conf);
}
reader.close();
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
/**
*
* This object contains a TrieNodeRef if there is such a thing that
* can be repeated. Two adjacent trie node slots that contain no
* split points can be filled with the same trie node, even if they
* are not on the same level. See buildTreeRec, below.
*
*/
private class CarriedTrieNodeRef
{
TrieNode content;
CarriedTrieNodeRef() {
content = null;
}
}
/**
* Given a sorted set of cut points, build a trie that will find the correct
* partition quickly.
* @param splits the list of cut points
* @param lower the lower bound of partitions 0..numPartitions-1
* @param upper the upper bound of partitions 0..numPartitions-1
* @param prefix the prefix that we have already checked against
* @param maxDepth the maximum depth we will build a trie for
* @return the trie node that will divide the splits correctly
*/
private TrieNode buildTrie(BinaryComparable[] splits, int lower,
int upper, byte[] prefix, int maxDepth) {
return buildTrieRec
(splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
}
/**
* This is the core of buildTrie. The interface, and stub, above, just adds
* an empty CarriedTrieNodeRef.
*
* We build trie nodes in depth first order, which is also in key space
* order. Every leaf node is referenced as a slot in a parent internal
* node. If two adjacent slots [in the DFO] hold leaf nodes that have
* no split point, then they are not separated by a split point either,
* because there's no place in key space for that split point to exist.
*
* When that happens, the leaf nodes would be semantically identical, and
* we reuse the object. A single CarriedTrieNodeRef "ref" lives for the
* duration of the tree-walk. ref carries a potentially reusable, unsplit
* leaf node for such reuse until a leaf node with a split arises, which
* breaks the chain until we need to make a new unsplit leaf node.
*
* Note that this use of CarriedTrieNodeRef means that for internal nodes,
* for internal nodes if this code is modified in any way we still need
* to make or fill in the subnodes in key space order.
*/
private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
final int depth = prefix.length;
// We generate leaves for a single split point as well as for
// no split points.
if (depth >= maxDepth || lower >= upper - 1) {
// If we have two consecutive requests for an unsplit trie node, we
// can deliver the same one the second time.
if (lower == upper && ref.content != null) {
return ref.content;
}
TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper);
ref.content = lower == upper ? result : null;
return result;
}
InnerTrieNode result = new InnerTrieNode(depth);
byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
// append an extra byte on to the prefix
int currentBound = lower;
for(int ch = 0; ch < 0xFF; ++ch) {
trial[depth] = (byte) (ch + 1);
lower = currentBound;
while (currentBound < upper) {
if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
break;
}
currentBound += 1;
}
trial[depth] = (byte) ch;
result.child[0xFF & ch]
= buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
}
// pick up the rest
trial[depth] = (byte)0xFF;
result.child[0xFF]
= buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
return result;
}
}

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -43,11 +42,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.PerformanceEvaluation;
@ -74,7 +75,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -180,7 +180,7 @@ public class TestHFileOutputFormat {
try {
Job job = new Job(conf);
FileOutputFormat.setOutputPath(job, dir);
context = getTestTaskAttemptContext(job);
context = createTestTaskAttemptContext(job);
HFileOutputFormat hof = new HFileOutputFormat();
writer = hof.getRecordWriter(context);
final byte [] b = Bytes.toBytes("b");
@ -208,29 +208,10 @@ public class TestHFileOutputFormat {
}
}
/**
* @return True if the available mapreduce is post-0.20.
*/
private static boolean isPost020MapReduce() {
// Here is a coarse test for post 0.20 hadoop; TAC became an interface.
return TaskAttemptContext.class.isInterface();
}
private TaskAttemptContext getTestTaskAttemptContext(final Job job)
private TaskAttemptContext createTestTaskAttemptContext(final Job job)
throws IOException, Exception {
TaskAttemptContext context;
if (isPost020MapReduce()) {
TaskAttemptID id =
TaskAttemptID.forName("attempt_200707121733_0001_m_000000_0");
Class<?> clazz =
Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
Constructor<?> c = clazz.
getConstructor(Configuration.class, TaskAttemptID.class);
context = (TaskAttemptContext)c.newInstance(job.getConfiguration(), id);
} else {
context = org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler.
getTaskAttemptContext(job);
}
HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0");
return context;
}
@ -250,7 +231,7 @@ public class TestHFileOutputFormat {
// build a record writer using HFileOutputFormat
Job job = new Job(conf);
FileOutputFormat.setOutputPath(job, dir);
context = getTestTaskAttemptContext(job);
context = createTestTaskAttemptContext(job);
HFileOutputFormat hof = new HFileOutputFormat();
writer = hof.getRecordWriter(context);
@ -593,7 +574,7 @@ public class TestHFileOutputFormat {
setupRandomGeneratorMapper(job);
HFileOutputFormat.configureIncrementalLoad(job, table);
FileOutputFormat.setOutputPath(job, dir);
context = getTestTaskAttemptContext(job);
context = createTestTaskAttemptContext(job);
HFileOutputFormat hof = new HFileOutputFormat();
writer = hof.getRecordWriter(context);

14
pom.xml
View File

@ -879,11 +879,25 @@
<artifactId>hbase-hadoop-compat</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>${compat.module}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>${compat.module}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>hbase-server</artifactId>
<groupId>org.apache.hbase</groupId>