HBASE-3392. Update backport of InputSampler to reflect MAPREDUCE-1820

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1054845 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-01-04 00:52:06 +00:00
parent b99c65fa13
commit 46a56a74db
2 changed files with 24 additions and 17 deletions

View File

@ -1297,6 +1297,7 @@ Release 0.90.0 - Unreleased
HBASE-2467 Concurrent flushers in HLog sync using HDFS-895 HBASE-2467 Concurrent flushers in HLog sync using HDFS-895
HBASE-3349 Pass HBase configuration to HttpServer HBASE-3349 Pass HBase configuration to HttpServer
HBASE-3372 HRS shouldn't print a full stack for ServerNotRunningException HBASE-3372 HRS shouldn't print a full stack for ServerNotRunningException
HBASE-3392 Update backport of InputSampler to reflect MAPREDUCE-1820
NEW FEATURES NEW FEATURES

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.util.ToolRunner;
* {@link TotalOrderPartitioner}. * {@link TotalOrderPartitioner}.
* *
* This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
* from Hadoop trunk at r910774, with the exception of replacing * from Hadoop trunk at r961542, with the exception of replacing
* TaskAttemptContextImpl with TaskAttemptContext. * TaskAttemptContextImpl with TaskAttemptContext.
*/ */
public class InputSampler<K,V> extends Configured implements Tool { public class InputSampler<K,V> extends Configured implements Tool {
@ -63,7 +63,7 @@ public class InputSampler<K,V> extends Configured implements Tool {
" [-inFormat <input format class>]\n" + " [-inFormat <input format class>]\n" +
" [-keyClass <map input & output key class>]\n" + " [-keyClass <map input & output key class>]\n" +
" [-splitRandom <double pcnt> <numSamples> <maxsplits> | " + " [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
"// Sample from random splits at random (general)\n" + " // Sample from random splits at random (general)\n" +
" -splitSample <numSamples> <maxsplits> | " + " -splitSample <numSamples> <maxsplits> | " +
" // Sample from first records in splits (random data)\n"+ " // Sample from first records in splits (random data)\n"+
" -splitInterval <double pcnt> <maxsplits>]" + " -splitInterval <double pcnt> <maxsplits>]" +
@ -129,16 +129,17 @@ public class InputSampler<K,V> extends Configured implements Tool {
List<InputSplit> splits = inf.getSplits(job); List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples); ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size()); int splitsToSample = Math.min(maxSplitsSampled, splits.size());
int splitStep = splits.size() / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample; int samplesPerSplit = numSamples / splitsToSample;
long records = 0; long records = 0;
for (int i = 0; i < splitsToSample; ++i) { for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContext(
job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader( RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i * splitStep), splits.get(i), samplingContext);
new TaskAttemptContext(job.getConfiguration(), reader.initialize(splits.get(i), samplingContext);
new TaskAttemptID()));
while (reader.nextKeyValue()) { while (reader.nextKeyValue()) {
samples.add(reader.getCurrentKey()); samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++records; ++records;
if ((i+1) * samplesPerSplit <= records) { if ((i+1) * samplesPerSplit <= records) {
break; break;
@ -213,13 +214,16 @@ public class InputSampler<K,V> extends Configured implements Tool {
// the target sample keyset // the target sample keyset
for (int i = 0; i < splitsToSample || for (int i = 0; i < splitsToSample ||
(i < splits.size() && samples.size() < numSamples); ++i) { (i < splits.size() && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.createRecordReader(splits.get(i), TaskAttemptContext samplingContext = new TaskAttemptContext(
new TaskAttemptContext(job.getConfiguration(), job.getConfiguration(), new TaskAttemptID());
new TaskAttemptID())); RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) { while (reader.nextKeyValue()) {
if (r.nextDouble() <= freq) { if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) { if (samples.size() < numSamples) {
samples.add(reader.getCurrentKey()); samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
} else { } else {
// When exceeding the maximum number of samples, replace a // When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency // random element with this one, then adjust the frequency
@ -227,7 +231,8 @@ public class InputSampler<K,V> extends Configured implements Tool {
// pushed out // pushed out
int ind = r.nextInt(numSamples); int ind = r.nextInt(numSamples);
if (ind != numSamples) { if (ind != numSamples) {
samples.set(ind, reader.getCurrentKey()); samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
} }
freq *= (numSamples - 1) / (double) numSamples; freq *= (numSamples - 1) / (double) numSamples;
} }
@ -277,19 +282,20 @@ public class InputSampler<K,V> extends Configured implements Tool {
List<InputSplit> splits = inf.getSplits(job); List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(); ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.size()); int splitsToSample = Math.min(maxSplitsSampled, splits.size());
int splitStep = splits.size() / splitsToSample;
long records = 0; long records = 0;
long kept = 0; long kept = 0;
for (int i = 0; i < splitsToSample; ++i) { for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContext(
job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader( RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i * splitStep), splits.get(i), samplingContext);
new TaskAttemptContext(job.getConfiguration(), reader.initialize(splits.get(i), samplingContext);
new TaskAttemptID()));
while (reader.nextKeyValue()) { while (reader.nextKeyValue()) {
++records; ++records;
if ((double) kept / records < freq) { if ((double) kept / records < freq) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++kept; ++kept;
samples.add(reader.getCurrentKey());
} }
} }
reader.close(); reader.close();