MAPREDUCE-4645. Provide a random seed to Slive to make the sequence of file names deterministic. Contributed by Ravi Prakash.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1389595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Shvachko 2012-09-24 21:17:29 +00:00
parent 38ec8babda
commit 7cf9362055
3 changed files with 21 additions and 21 deletions

View File

@ -409,6 +409,9 @@ Release 0.23.4 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-4645. Provide a random seed to Slive to make the sequence
of file names deterministic. (Ravi Prakash via shv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -41,7 +41,9 @@ abstract class Operation {
this.config = cfg; this.config = cfg;
this.type = type; this.type = type;
this.rnd = rnd; this.rnd = rnd;
this.finder = new PathFinder(cfg, rnd); // Use a new Random instance so that the sequence of file names produced is
// the same even in case of unsuccessful operations
this.finder = new PathFinder(cfg, new Random(rnd.nextInt()));
} }
/** /**

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -50,8 +52,7 @@ public class SliveMapper extends MapReduceBase implements
private FileSystem filesystem; private FileSystem filesystem;
private ConfigExtractor config; private ConfigExtractor config;
private WeightSelector selector; private int taskId;
private Random rnd;
/* /*
* (non-Javadoc) * (non-Javadoc)
@ -70,19 +71,19 @@ public class SliveMapper extends MapReduceBase implements
} }
try { try {
config = new ConfigExtractor(conf); config = new ConfigExtractor(conf);
Long rndSeed = config.getRandomSeed();
if (rndSeed != null) {
rnd = new Random(rndSeed);
} else {
rnd = new Random();
}
selector = new WeightSelector(config, rnd);
ConfigExtractor.dumpOptions(config); ConfigExtractor.dumpOptions(config);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Unable to setup slive " + StringUtils.stringifyException(e)); LOG.error("Unable to setup slive " + StringUtils.stringifyException(e));
throw new RuntimeException("Unable to setup slive configuration", e); throw new RuntimeException("Unable to setup slive configuration", e);
} }
if(conf.get(MRJobConfig.TASK_ATTEMPT_ID) != null ) {
this.taskId = TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID))
.getTaskID().getId();
} else {
// So that branch-1/0.20 can run this same code as well
this.taskId = TaskAttemptID.forName(conf.get("mapred.task.id"))
.getTaskID().getId();
}
} }
/** /**
@ -94,15 +95,6 @@ public class SliveMapper extends MapReduceBase implements
return config; return config;
} }
/**
* Gets the operation selector to use for this object
*
* @return WeightSelector
*/
private WeightSelector getSelector() {
return selector;
}
/** /**
* Logs to the given reporter and logs to the internal logger at info level * Logs to the given reporter and logs to the internal logger at info level
* *
@ -154,6 +146,10 @@ public class SliveMapper extends MapReduceBase implements
Reporter reporter) throws IOException { Reporter reporter) throws IOException {
logAndSetStatus(reporter, "Running slive mapper for dummy key " + key logAndSetStatus(reporter, "Running slive mapper for dummy key " + key
+ " and dummy value " + value); + " and dummy value " + value);
//Add taskID to randomSeed to deterministically seed rnd.
Random rnd = config.getRandomSeed() != null ?
new Random(this.taskId + config.getRandomSeed()) : new Random();
WeightSelector selector = new WeightSelector(config, rnd);
long startTime = Timer.now(); long startTime = Timer.now();
long opAm = 0; long opAm = 0;
long sleepOps = 0; long sleepOps = 0;
@ -163,7 +159,6 @@ public class SliveMapper extends MapReduceBase implements
if (sleepRange != null) { if (sleepRange != null) {
sleeper = new SleepOp(getConfig(), rnd); sleeper = new SleepOp(getConfig(), rnd);
} }
WeightSelector selector = getSelector();
while (Timer.elapsed(startTime) < duration) { while (Timer.elapsed(startTime) < duration) {
try { try {
logAndSetStatus(reporter, "Attempting to select operation #" logAndSetStatus(reporter, "Attempting to select operation #"