diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/HadoopShims.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/HadoopShims.java new file mode 100644 index 00000000000..88e4af9558d --- /dev/null +++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/HadoopShims.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + + +/** + * A compatibility shim layer for interacting with different versions of Hadoop. + */ +//NOTE: we can move this under src/main if main code wants to use this shim layer +public interface HadoopShims { + + /** + * Returns a TaskAttemptContext instance created from the given parameters. + * @param job an instance of o.a.h.mapreduce.Job + * @param taskId an identifier for the task attempt id. Should be parsable by + * TaskAttemptId.forName() + * @return a concrete TaskAttemptContext instance of o.a.h.mapreduce.TaskAttemptContext + */ + public T createTestTaskAttemptContext(final J job, final String taskId); + +} diff --git a/hbase-hadoop1-compat/pom.xml b/hbase-hadoop1-compat/pom.xml index 9a54c5cc814..3fed0790b22 100644 --- a/hbase-hadoop1-compat/pom.xml +++ b/hbase-hadoop1-compat/pom.xml @@ -58,6 +58,12 @@ limitations under the License. org.apache.hbase hbase-hadoop-compat + + org.apache.hbase + hbase-hadoop-compat + test-jar + test + org.apache.hadoop hadoop-core diff --git a/hbase-hadoop1-compat/src/test/java/org/apache/hadoop/hbase/HadoopShimsImpl.java b/hbase-hadoop1-compat/src/test/java/org/apache/hadoop/hbase/HadoopShimsImpl.java new file mode 100644 index 00000000000..34ae1748b99 --- /dev/null +++ b/hbase-hadoop1-compat/src/test/java/org/apache/hadoop/hbase/HadoopShimsImpl.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * Compatibility shim layer implementation for Hadoop-1. + */ +public class HadoopShimsImpl implements HadoopShims { + + /** + * Returns a TaskAttemptContext instance created from the given parameters. + * @param job an instance of o.a.h.mapreduce.Job + * @param taskId an identifier for the task attempt id. Should be parsable by + * TaskAttemptId.forName() + * @return a concrete TaskAttemptContext instance of o.a.h.mapreduce.TaskAttemptContext + */ + @Override + @SuppressWarnings("unchecked") + public T createTestTaskAttemptContext(J job, String taskId) { + Job j = (Job)job; + return (T)new TaskAttemptContext(j.getConfiguration(), TaskAttemptID.forName(taskId)); + } +} diff --git a/hbase-hadoop1-compat/src/test/resources/META-INF/services/org.apache.hadoop.hbase.HadoopShims b/hbase-hadoop1-compat/src/test/resources/META-INF/services/org.apache.hadoop.hbase.HadoopShims new file mode 100644 index 00000000000..af31e87e22f --- /dev/null +++ b/hbase-hadoop1-compat/src/test/resources/META-INF/services/org.apache.hadoop.hbase.HadoopShims @@ -0,0 +1 @@ +org.apache.hadoop.hbase.HadoopShimsImpl \ No newline at end of file diff --git a/hbase-hadoop2-compat/pom.xml b/hbase-hadoop2-compat/pom.xml index a3c307a1605..5a03e16b3d5 100644 --- a/hbase-hadoop2-compat/pom.xml +++ b/hbase-hadoop2-compat/pom.xml @@ -112,6 +112,12 @@ limitations under the License. org.apache.hbase hbase-hadoop-compat + + org.apache.hbase + hbase-hadoop-compat + test-jar + test + org.apache.hadoop hadoop-client diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/HadoopShimsImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/HadoopShimsImpl.java new file mode 100644 index 00000000000..ce142e8b572 --- /dev/null +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/HadoopShimsImpl.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Compatibility shim layer implementation for Hadoop-2. + */ +public class HadoopShimsImpl implements HadoopShims { + + /** + * Returns a TaskAttemptContext instance created from the given parameters. + * @param job an instance of o.a.h.mapreduce.Job + * @param taskId an identifier for the task attempt id. Should be parsable by + * TaskAttemptId.forName() + * @return a concrete TaskAttemptContext instance of o.a.h.mapreduce.TaskAttemptContext + */ + @Override + @SuppressWarnings("unchecked") + public T createTestTaskAttemptContext(J job, String taskId) { + Job j = (Job)job; + return (T)new TaskAttemptContextImpl(j.getConfiguration(), TaskAttemptID.forName(taskId)); + } +} diff --git a/hbase-hadoop2-compat/src/test/resources/META-INF/services/org.apache.hadoop.hbase.HadoopShims b/hbase-hadoop2-compat/src/test/resources/META-INF/services/org.apache.hadoop.hbase.HadoopShims new file mode 100644 index 00000000000..af31e87e22f --- /dev/null +++ b/hbase-hadoop2-compat/src/test/resources/META-INF/services/org.apache.hadoop.hbase.HadoopShims @@ -0,0 +1 @@ +org.apache.hadoop.hbase.HadoopShimsImpl \ No newline at end of file diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index d83cb253dfc..d9380d26205 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -266,10 +266,21 @@ org.apache.hbase hbase-hadoop-compat + + org.apache.hbase + hbase-hadoop-compat + test-jar + test + org.apache.hbase ${compat.module} - ${project.version} + + + org.apache.hbase + ${compat.module} + test-jar + test diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 715b4506510..3cbf328b876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; @@ -62,11 +61,11 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; /** * Writes HFiles. Passed KeyValues must arrive in order. @@ -83,9 +82,9 @@ public class HFileOutputFormat extends FileOutputFormat getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file @@ -121,7 +120,7 @@ public class HFileOutputFormat extends FileOutputFormat() { // Map of families to writers and how much has been output on the writer. private final Map writers = @@ -317,13 +316,8 @@ public class HFileOutputFormat extends FileOutputFormat topClass; - try { - topClass = getTotalOrderPartitionerClass(); - } catch (ClassNotFoundException e) { - throw new IOException("Failed getting TotalOrderPartitioner", e); - } - job.setPartitionerClass(topClass); + + job.setPartitionerClass(TotalOrderPartitioner.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); @@ -355,11 +349,7 @@ public class HFileOutputFormat extends FileOutputFormat hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner. - * If 0.20, then we want to use the TOP that we have under hadoopbackport. - * This method is about hbase being able to run on different versions of - * hadoop. In 0.20.x hadoops, we have to use the TOP that is bundled with - * hbase. Otherwise, we use the one in Hadoop. - * @return Instance of the TotalOrderPartitioner class - * @throws ClassNotFoundException If can't find a TotalOrderPartitioner. - */ - private static Class getTotalOrderPartitionerClass() - throws ClassNotFoundException { - Class clazz = null; - try { - clazz = (Class) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner"); - } catch (ClassNotFoundException e) { - clazz = - (Class) Class.forName("org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner"); - } - return clazz; - } - /** * Run inside the task to deserialize column family to compression algorithm * map from the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java deleted file mode 100644 index 89c342837c7..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java +++ /dev/null @@ -1,443 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce.hadoopbackport; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * Utility for collecting samples and writing a partition file for - * {@link TotalOrderPartitioner}. - * - * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner - * from Hadoop trunk at r961542, with the exception of replacing - * TaskAttemptContextImpl with TaskAttemptContext. - */ -public class InputSampler extends Configured implements Tool { - - private static final Log LOG = LogFactory.getLog(InputSampler.class); - - static int printUsage() { - System.out.println("sampler -r \n" + - " [-inFormat ]\n" + - " [-keyClass ]\n" + - " [-splitRandom | " + - " // Sample from random splits at random (general)\n" + - " -splitSample | " + - " // Sample from first records in splits (random data)\n"+ - " -splitInterval ]" + - " // Sample from splits at intervals (sorted data)"); - System.out.println("Default sampler: -splitRandom 0.1 10000 10"); - ToolRunner.printGenericCommandUsage(System.out); - return -1; - } - - public InputSampler(Configuration conf) { - setConf(conf); - } - - /** - * Interface to sample using an - * {@link org.apache.hadoop.mapreduce.InputFormat}. - */ - public interface Sampler { - /** - * For a given job, collect and return a subset of the keys from the - * input data. - */ - K[] getSample(InputFormat inf, Job job) - throws IOException, InterruptedException; - } - - /** - * Samples the first n records from s splits. - * Inexpensive way to sample random data. - */ - public static class SplitSampler implements Sampler { - - private final int numSamples; - private final int maxSplitsSampled; - - /** - * Create a SplitSampler sampling all splits. - * Takes the first numSamples / numSplits records from each split. - * @param numSamples Total number of samples to obtain from all selected - * splits. - */ - public SplitSampler(int numSamples) { - this(numSamples, Integer.MAX_VALUE); - } - - /** - * Create a new SplitSampler. - * @param numSamples Total number of samples to obtain from all selected - * splits. - * @param maxSplitsSampled The maximum number of splits to examine. - */ - public SplitSampler(int numSamples, int maxSplitsSampled) { - this.numSamples = numSamples; - this.maxSplitsSampled = maxSplitsSampled; - } - - /** - * From each split sampled, take the first numSamples / numSplits records. - */ - @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type - public K[] getSample(InputFormat inf, Job job) - throws IOException, InterruptedException { - List splits = inf.getSplits(job); - ArrayList samples = new ArrayList(numSamples); - int splitsToSample = Math.min(maxSplitsSampled, splits.size()); - int samplesPerSplit = numSamples / splitsToSample; - long records = 0; - for (int i = 0; i < splitsToSample; ++i) { - TaskAttemptContext samplingContext = getTaskAttemptContext(job); - RecordReader reader = inf.createRecordReader( - splits.get(i), samplingContext); - reader.initialize(splits.get(i), samplingContext); - while (reader.nextKeyValue()) { - samples.add(ReflectionUtils.copy(job.getConfiguration(), - reader.getCurrentKey(), null)); - ++records; - if ((i+1) * samplesPerSplit <= records) { - break; - } - } - reader.close(); - } - return (K[])samples.toArray(); - } - } - - /** - * This method is about making hbase portable, making it so it can run on - * more than just hadoop 0.20. In later hadoops, TaskAttemptContext became - * an Interface. But in hadoops where TAC is an Interface, we shouldn't - * be using the classes that are in this package; we should be using the - * native Hadoop ones (We'll throw a ClassNotFoundException if end up in - * here when we should be using native hadoop TotalOrderPartitioner). - * @param job - * @return Context - * @throws IOException - */ - public static TaskAttemptContext getTaskAttemptContext(final Job job) - throws IOException { - Constructor c; - try { - c = TaskAttemptContext.class.getConstructor(Configuration.class, TaskAttemptID.class); - } catch (Exception e) { - throw new IOException("Failed getting constructor", e); - } - try { - return c.newInstance(job.getConfiguration(), new TaskAttemptID()); - } catch (Exception e) { - throw new IOException("Failed creating instance", e); - } - } - - /** - * Sample from random points in the input. - * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from - * each split. - */ - public static class RandomSampler implements Sampler { - private double freq; - private final int numSamples; - private final int maxSplitsSampled; - - /** - * Create a new RandomSampler sampling all splits. - * This will read every split at the client, which is very expensive. - * @param freq Probability with which a key will be chosen. - * @param numSamples Total number of samples to obtain from all selected - * splits. - */ - public RandomSampler(double freq, int numSamples) { - this(freq, numSamples, Integer.MAX_VALUE); - } - - /** - * Create a new RandomSampler. - * @param freq Probability with which a key will be chosen. - * @param numSamples Total number of samples to obtain from all selected - * splits. - * @param maxSplitsSampled The maximum number of splits to examine. - */ - public RandomSampler(double freq, int numSamples, int maxSplitsSampled) { - this.freq = freq; - this.numSamples = numSamples; - this.maxSplitsSampled = maxSplitsSampled; - } - - /** - * Randomize the split order, then take the specified number of keys from - * each split sampled, where each key is selected with the specified - * probability and possibly replaced by a subsequently selected key when - * the quota of keys from that split is satisfied. - */ - @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type - public K[] getSample(InputFormat inf, Job job) - throws IOException, InterruptedException { - List splits = inf.getSplits(job); - ArrayList samples = new ArrayList(numSamples); - int splitsToSample = Math.min(maxSplitsSampled, splits.size()); - - Random r = new Random(); - long seed = r.nextLong(); - r.setSeed(seed); - LOG.debug("seed: " + seed); - // shuffle splits - for (int i = 0; i < splits.size(); ++i) { - InputSplit tmp = splits.get(i); - int j = r.nextInt(splits.size()); - splits.set(i, splits.get(j)); - splits.set(j, tmp); - } - // our target rate is in terms of the maximum number of sample splits, - // but we accept the possibility of sampling additional splits to hit - // the target sample keyset - for (int i = 0; i < splitsToSample || - (i < splits.size() && samples.size() < numSamples); ++i) { - TaskAttemptContext samplingContext = getTaskAttemptContext(job); - RecordReader reader = inf.createRecordReader( - splits.get(i), samplingContext); - reader.initialize(splits.get(i), samplingContext); - while (reader.nextKeyValue()) { - if (r.nextDouble() <= freq) { - if (samples.size() < numSamples) { - samples.add(ReflectionUtils.copy(job.getConfiguration(), - reader.getCurrentKey(), null)); - } else { - // When exceeding the maximum number of samples, replace a - // random element with this one, then adjust the frequency - // to reflect the possibility of existing elements being - // pushed out - int ind = r.nextInt(numSamples); - if (ind != numSamples) { - samples.set(ind, ReflectionUtils.copy(job.getConfiguration(), - reader.getCurrentKey(), null)); - } - freq *= (numSamples - 1) / (double) numSamples; - } - } - } - reader.close(); - } - return (K[])samples.toArray(); - } - } - - /** - * Sample from s splits at regular intervals. - * Useful for sorted data. - */ - public static class IntervalSampler implements Sampler { - private final double freq; - private final int maxSplitsSampled; - - /** - * Create a new IntervalSampler sampling all splits. - * @param freq The frequency with which records will be emitted. - */ - public IntervalSampler(double freq) { - this(freq, Integer.MAX_VALUE); - } - - /** - * Create a new IntervalSampler. - * @param freq The frequency with which records will be emitted. - * @param maxSplitsSampled The maximum number of splits to examine. - * @see #getSample - */ - public IntervalSampler(double freq, int maxSplitsSampled) { - this.freq = freq; - this.maxSplitsSampled = maxSplitsSampled; - } - - /** - * For each split sampled, emit when the ratio of the number of records - * retained to the total record count is less than the specified - * frequency. - */ - @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type - public K[] getSample(InputFormat inf, Job job) - throws IOException, InterruptedException { - List splits = inf.getSplits(job); - ArrayList samples = new ArrayList(); - int splitsToSample = Math.min(maxSplitsSampled, splits.size()); - long records = 0; - long kept = 0; - for (int i = 0; i < splitsToSample; ++i) { - TaskAttemptContext samplingContext = getTaskAttemptContext(job); - RecordReader reader = inf.createRecordReader( - splits.get(i), samplingContext); - reader.initialize(splits.get(i), samplingContext); - while (reader.nextKeyValue()) { - ++records; - if ((double) kept / records < freq) { - samples.add(ReflectionUtils.copy(job.getConfiguration(), - reader.getCurrentKey(), null)); - ++kept; - } - } - reader.close(); - } - return (K[])samples.toArray(); - } - } - - /** - * Write a partition file for the given job, using the Sampler provided. - * Queries the sampler for a sample keyset, sorts by the output key - * comparator, selects the keys for each rank, and writes to the destination - * returned from {@link TotalOrderPartitioner#getPartitionFile}. - */ - @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator - public static void writePartitionFile(Job job, Sampler sampler) - throws IOException, ClassNotFoundException, InterruptedException { - Configuration conf = job.getConfiguration(); - final InputFormat inf = - ReflectionUtils.newInstance(job.getInputFormatClass(), conf); - int numPartitions = job.getNumReduceTasks(); - K[] samples = sampler.getSample(inf, job); - LOG.info("Using " + samples.length + " samples"); - RawComparator comparator = - (RawComparator) job.getSortComparator(); - Arrays.sort(samples, comparator); - Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); - FileSystem fs = dst.getFileSystem(conf); - if (fs.exists(dst)) { - fs.delete(dst, false); - } - SequenceFile.Writer writer = SequenceFile.createWriter(fs, - conf, dst, job.getMapOutputKeyClass(), NullWritable.class); - NullWritable nullValue = NullWritable.get(); - float stepSize = samples.length / (float) numPartitions; - int last = -1; - for(int i = 1; i < numPartitions; ++i) { - int k = Math.round(stepSize * i); - while (last >= k && comparator.compare(samples[last], samples[k]) == 0) { - ++k; - } - writer.append(samples[k], nullValue); - last = k; - } - writer.close(); - } - - /** - * Driver for InputSampler from the command line. - * Configures a JobConf instance and calls {@link #writePartitionFile}. - */ - public int run(String[] args) throws Exception { - Job job = new Job(getConf()); - ArrayList otherArgs = new ArrayList(); - Sampler sampler = null; - for(int i=0; i < args.length; ++i) { - try { - if ("-r".equals(args[i])) { - job.setNumReduceTasks(Integer.parseInt(args[++i])); - } else if ("-inFormat".equals(args[i])) { - job.setInputFormatClass( - Class.forName(args[++i]).asSubclass(InputFormat.class)); - } else if ("-keyClass".equals(args[i])) { - job.setMapOutputKeyClass( - Class.forName(args[++i]).asSubclass(WritableComparable.class)); - } else if ("-splitSample".equals(args[i])) { - int numSamples = Integer.parseInt(args[++i]); - int maxSplits = Integer.parseInt(args[++i]); - if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; - sampler = new SplitSampler(numSamples, maxSplits); - } else if ("-splitRandom".equals(args[i])) { - double pcnt = Double.parseDouble(args[++i]); - int numSamples = Integer.parseInt(args[++i]); - int maxSplits = Integer.parseInt(args[++i]); - if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; - sampler = new RandomSampler(pcnt, numSamples, maxSplits); - } else if ("-splitInterval".equals(args[i])) { - double pcnt = Double.parseDouble(args[++i]); - int maxSplits = Integer.parseInt(args[++i]); - if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; - sampler = new IntervalSampler(pcnt, maxSplits); - } else { - otherArgs.add(args[i]); - } - } catch (NumberFormatException except) { - System.out.println("ERROR: Integer expected instead of " + args[i]); - return printUsage(); - } catch (ArrayIndexOutOfBoundsException except) { - System.out.println("ERROR: Required parameter missing from " + - args[i-1]); - return printUsage(); - } - } - if (job.getNumReduceTasks() <= 1) { - System.err.println("Sampler requires more than one reducer"); - return printUsage(); - } - if (otherArgs.size() < 2) { - System.out.println("ERROR: Wrong number of parameters: "); - return printUsage(); - } - if (null == sampler) { - sampler = new RandomSampler(0.1, 10000, 10); - } - - Path outf = new Path(otherArgs.remove(otherArgs.size() - 1)); - TotalOrderPartitioner.setPartitionFile(getConf(), outf); - for (String s : otherArgs) { - FileInputFormat.addInputPath(job, new Path(s)); - } - InputSampler.writePartitionFile(job, sampler); - - return 0; - } - - public static void main(String[] args) throws Exception { - InputSampler sampler = new InputSampler(new Configuration()); - int res = ToolRunner.run(sampler, args); - System.exit(res); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java deleted file mode 100644 index 065e844e5b5..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java +++ /dev/null @@ -1,401 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce.hadoopbackport; - -import java.io.IOException; -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Arrays; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BinaryComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * Partitioner effecting a total order by reading split points from - * an externally generated source. - * - * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner - * from Hadoop trunk at r910774. - */ -public class TotalOrderPartitioner,V> - extends Partitioner implements Configurable { - - private Node partitions; - public static final String DEFAULT_PATH = "_partition.lst"; - public static final String PARTITIONER_PATH = - "mapreduce.totalorderpartitioner.path"; - public static final String MAX_TRIE_DEPTH = - "mapreduce.totalorderpartitioner.trie.maxdepth"; - public static final String NATURAL_ORDER = - "mapreduce.totalorderpartitioner.naturalorder"; - Configuration conf; - - public TotalOrderPartitioner() { } - - /** - * Read in the partition file and build indexing data structures. - * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and - * total.order.partitioner.natural.order is not false, a trie - * of the first total.order.partitioner.max.trie.depth(2) + 1 bytes - * will be built. Otherwise, keys will be located using a binary search of - * the partition keyset using the {@link org.apache.hadoop.io.RawComparator} - * defined for this job. The input file must be sorted with the same - * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys. - */ - @SuppressWarnings("unchecked") // keytype from conf not static - public void setConf(Configuration conf) { - try { - this.conf = conf; - String parts = getPartitionFile(conf); - final Path partFile = new Path(parts); - final FileSystem fs = (DEFAULT_PATH.equals(parts)) - ? FileSystem.getLocal(conf) // assume in DistributedCache - : partFile.getFileSystem(conf); - - Job job = new Job(conf); - Class keyClass = (Class)job.getMapOutputKeyClass(); - K[] splitPoints = readPartitions(fs, partFile, keyClass, conf); - if (splitPoints.length != job.getNumReduceTasks() - 1) { - throw new IOException("Wrong number of partitions in keyset:" - + splitPoints.length); - } - RawComparator comparator = - (RawComparator) job.getSortComparator(); - for (int i = 0; i < splitPoints.length - 1; ++i) { - if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) { - throw new IOException("Split points are out of order"); - } - } - boolean natOrder = - conf.getBoolean(NATURAL_ORDER, true); - if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) { - partitions = buildTrie((BinaryComparable[])splitPoints, 0, - splitPoints.length, new byte[0], - // Now that blocks of identical splitless trie nodes are - // represented reentrantly, and we develop a leaf for any trie - // node with only one split point, the only reason for a depth - // limit is to refute stack overflow or bloat in the pathological - // case where the split points are long and mostly look like bytes - // iii...iixii...iii . Therefore, we make the default depth - // limit large but not huge. - conf.getInt(MAX_TRIE_DEPTH, 200)); - } else { - partitions = new BinarySearchNode(splitPoints, comparator); - } - } catch (IOException e) { - throw new IllegalArgumentException("Can't read partitions file", e); - } - } - - public Configuration getConf() { - return conf; - } - - // by construction, we know if our keytype - @SuppressWarnings("unchecked") // is memcmp-able and uses the trie - public int getPartition(K key, V value, int numPartitions) { - return partitions.findPartition(key); - } - - /** - * Set the path to the SequenceFile storing the sorted partition keyset. - * It must be the case that for R reduces, there are R-1 - * keys in the SequenceFile. - */ - public static void setPartitionFile(Configuration conf, Path p) { - conf.set(PARTITIONER_PATH, p.toString()); - } - - /** - * Get the path to the SequenceFile storing the sorted partition keyset. - * @see #setPartitionFile(Configuration, Path) - */ - public static String getPartitionFile(Configuration conf) { - return conf.get(PARTITIONER_PATH, DEFAULT_PATH); - } - - /** - * Interface to the partitioner to locate a key in the partition keyset. - */ - interface Node { - /** - * Locate partition in keyset K, st [Ki..Ki+1) defines a partition, - * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1. - */ - int findPartition(T key); - } - - /** - * Base class for trie nodes. If the keytype is memcomp-able, this builds - * tries of the first total.order.partitioner.max.trie.depth - * bytes. - */ - static abstract class TrieNode implements Node { - private final int level; - TrieNode(int level) { - this.level = level; - } - int getLevel() { - return level; - } - } - - /** - * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or - * where disabled by total.order.partitioner.natural.order, - * search the partition keyset with a binary search. - */ - class BinarySearchNode implements Node { - private final K[] splitPoints; - private final RawComparator comparator; - BinarySearchNode(K[] splitPoints, RawComparator comparator) { - this.splitPoints = splitPoints; - this.comparator = comparator; - } - public int findPartition(K key) { - final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1; - return (pos < 0) ? -pos : pos; - } - } - - /** - * An inner trie node that contains 256 children based on the next - * character. - */ - class InnerTrieNode extends TrieNode { - private TrieNode[] child = new TrieNode[256]; - - InnerTrieNode(int level) { - super(level); - } - public int findPartition(BinaryComparable key) { - int level = getLevel(); - if (key.getLength() <= level) { - return child[0].findPartition(key); - } - return child[0xFF & key.getBytes()[level]].findPartition(key); - } - } - - /** - * @param level the tree depth at this node - * @param splitPoints the full split point vector, which holds - * the split point or points this leaf node - * should contain - * @param lower first INcluded element of splitPoints - * @param upper first EXcluded element of splitPoints - * @return a leaf node. They come in three kinds: no split points - * [and the findParttion returns a canned index], one split - * point [and we compare with a single comparand], or more - * than one [and we do a binary search]. The last case is - * rare. - */ - private TrieNode LeafTrieNodeFactory - (int level, BinaryComparable[] splitPoints, int lower, int upper) { - switch (upper - lower) { - case 0: - return new UnsplitTrieNode(level, lower); - - case 1: - return new SinglySplitTrieNode(level, splitPoints, lower); - - default: - return new LeafTrieNode(level, splitPoints, lower, upper); - } - } - - /** - * A leaf trie node that scans for the key between lower..upper. - * - * We don't generate many of these now, since we usually continue trie-ing - * when more than one split point remains at this level. and we make different - * objects for nodes with 0 or 1 split point. - */ - private class LeafTrieNode extends TrieNode { - final int lower; - final int upper; - final BinaryComparable[] splitPoints; - LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) { - super(level); - this.lower = lower; - this.upper = upper; - this.splitPoints = splitPoints; - } - public int findPartition(BinaryComparable key) { - final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1; - return (pos < 0) ? -pos : pos; - } - } - - private class UnsplitTrieNode extends TrieNode { - final int result; - - UnsplitTrieNode(int level, int value) { - super(level); - this.result = value; - } - - public int findPartition(BinaryComparable key) { - return result; - } - } - - private class SinglySplitTrieNode extends TrieNode { - final int lower; - final BinaryComparable mySplitPoint; - - SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) { - super(level); - this.lower = lower; - this.mySplitPoint = splitPoints[lower]; - } - - public int findPartition(BinaryComparable key) { - return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1); - } - } - - - /** - * Read the cut points from the given IFile. - * @param fs The file system - * @param p The path to read - * @param keyClass The map output key class - * @param job The job config - * @throws IOException - */ - // matching key types enforced by passing in - @SuppressWarnings("unchecked") // map output key class - private K[] readPartitions(FileSystem fs, Path p, Class keyClass, - Configuration conf) throws IOException { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); - ArrayList parts = new ArrayList(); - K key = ReflectionUtils.newInstance(keyClass, conf); - NullWritable value = NullWritable.get(); - while (reader.next(key, value)) { - parts.add(key); - key = ReflectionUtils.newInstance(keyClass, conf); - } - reader.close(); - return parts.toArray((K[])Array.newInstance(keyClass, parts.size())); - } - - /** - * - * This object contains a TrieNodeRef if there is such a thing that - * can be repeated. Two adjacent trie node slots that contain no - * split points can be filled with the same trie node, even if they - * are not on the same level. See buildTreeRec, below. - * - */ - private class CarriedTrieNodeRef - { - TrieNode content; - - CarriedTrieNodeRef() { - content = null; - } - } - - - /** - * Given a sorted set of cut points, build a trie that will find the correct - * partition quickly. - * @param splits the list of cut points - * @param lower the lower bound of partitions 0..numPartitions-1 - * @param upper the upper bound of partitions 0..numPartitions-1 - * @param prefix the prefix that we have already checked against - * @param maxDepth the maximum depth we will build a trie for - * @return the trie node that will divide the splits correctly - */ - private TrieNode buildTrie(BinaryComparable[] splits, int lower, - int upper, byte[] prefix, int maxDepth) { - return buildTrieRec - (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef()); - } - - /** - * This is the core of buildTrie. The interface, and stub, above, just adds - * an empty CarriedTrieNodeRef. - * - * We build trie nodes in depth first order, which is also in key space - * order. Every leaf node is referenced as a slot in a parent internal - * node. If two adjacent slots [in the DFO] hold leaf nodes that have - * no split point, then they are not separated by a split point either, - * because there's no place in key space for that split point to exist. - * - * When that happens, the leaf nodes would be semantically identical, and - * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the - * duration of the tree-walk. ref carries a potentially reusable, unsplit - * leaf node for such reuse until a leaf node with a split arises, which - * breaks the chain until we need to make a new unsplit leaf node. - * - * Note that this use of CarriedTrieNodeRef means that for internal nodes, - * for internal nodes if this code is modified in any way we still need - * to make or fill in the subnodes in key space order. - */ - private TrieNode buildTrieRec(BinaryComparable[] splits, int lower, - int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) { - final int depth = prefix.length; - // We generate leaves for a single split point as well as for - // no split points. - if (depth >= maxDepth || lower >= upper - 1) { - // If we have two consecutive requests for an unsplit trie node, we - // can deliver the same one the second time. - if (lower == upper && ref.content != null) { - return ref.content; - } - TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper); - ref.content = lower == upper ? result : null; - return result; - } - InnerTrieNode result = new InnerTrieNode(depth); - byte[] trial = Arrays.copyOf(prefix, prefix.length + 1); - // append an extra byte on to the prefix - int currentBound = lower; - for(int ch = 0; ch < 0xFF; ++ch) { - trial[depth] = (byte) (ch + 1); - lower = currentBound; - while (currentBound < upper) { - if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) { - break; - } - currentBound += 1; - } - trial[depth] = (byte) ch; - result.child[0xFF & ch] - = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); - } - // pick up the rest - trial[depth] = (byte)0xFF; - result.child[0xFF] - = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); - - return result; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index a74aede2b82..8ad9ce0b856 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -43,11 +42,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HadoopShims; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.PerformanceEvaluation; @@ -74,7 +75,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -180,7 +180,7 @@ public class TestHFileOutputFormat { try { Job job = new Job(conf); FileOutputFormat.setOutputPath(job, dir); - context = getTestTaskAttemptContext(job); + context = createTestTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); writer = hof.getRecordWriter(context); final byte [] b = Bytes.toBytes("b"); @@ -208,29 +208,10 @@ public class TestHFileOutputFormat { } } - /** - * @return True if the available mapreduce is post-0.20. - */ - private static boolean isPost020MapReduce() { - // Here is a coarse test for post 0.20 hadoop; TAC became an interface. - return TaskAttemptContext.class.isInterface(); - } - - private TaskAttemptContext getTestTaskAttemptContext(final Job job) + private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws IOException, Exception { - TaskAttemptContext context; - if (isPost020MapReduce()) { - TaskAttemptID id = - TaskAttemptID.forName("attempt_200707121733_0001_m_000000_0"); - Class clazz = - Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); - Constructor c = clazz. - getConstructor(Configuration.class, TaskAttemptID.class); - context = (TaskAttemptContext)c.newInstance(job.getConfiguration(), id); - } else { - context = org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler. - getTaskAttemptContext(job); - } + HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); + TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0"); return context; } @@ -250,7 +231,7 @@ public class TestHFileOutputFormat { // build a record writer using HFileOutputFormat Job job = new Job(conf); FileOutputFormat.setOutputPath(job, dir); - context = getTestTaskAttemptContext(job); + context = createTestTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); writer = hof.getRecordWriter(context); @@ -593,7 +574,7 @@ public class TestHFileOutputFormat { setupRandomGeneratorMapper(job); HFileOutputFormat.configureIncrementalLoad(job, table); FileOutputFormat.setOutputPath(job, dir); - context = getTestTaskAttemptContext(job); + context = createTestTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); writer = hof.getRecordWriter(context); diff --git a/pom.xml b/pom.xml index b2610a85442..b8ac209ccb6 100644 --- a/pom.xml +++ b/pom.xml @@ -879,11 +879,25 @@ hbase-hadoop-compat ${project.version} + + org.apache.hbase + hbase-hadoop-compat + ${project.version} + test-jar + test + org.apache.hbase ${compat.module} ${project.version} + + org.apache.hbase + ${compat.module} + ${project.version} + test-jar + test + hbase-server org.apache.hbase