HADOOP-10328. Merging change r1570304 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1570305 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-02-20 19:04:04 +00:00
parent 7ba83fb31b
commit 631a915b94
2 changed files with 75 additions and 48 deletions

View File

@ -61,6 +61,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10346. Deadlock while logging tokens (jlowe) HADOOP-10346. Deadlock while logging tokens (jlowe)
HADOOP-10328. loadGenerator exit code is not reliable.
(Haohui Mai via cnauroth)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -26,6 +26,7 @@ import java.io.InputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Random; import java.util.Random;
@ -39,6 +40,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -137,11 +139,15 @@ public class LoadGenerator extends Configured implements Tool {
"-startTime <startTimeInMillis>\n" + "-startTime <startTimeInMillis>\n" +
"-scriptFile <filename>"; "-scriptFile <filename>";
final private String hostname; final private String hostname;
private final byte[] WRITE_CONTENTS = new byte[4096];
private static final int ERR_TEST_FAILED = 2;
/** Constructor */ /** Constructor */
public LoadGenerator() throws IOException, UnknownHostException { public LoadGenerator() throws IOException, UnknownHostException {
InetAddress addr = InetAddress.getLocalHost(); InetAddress addr = InetAddress.getLocalHost();
hostname = addr.getHostName(); hostname = addr.getHostName();
Arrays.fill(WRITE_CONTENTS, (byte) 'a');
} }
private final static int OPEN = 0; private final static int OPEN = 0;
@ -178,7 +184,8 @@ public class LoadGenerator extends Configured implements Tool {
private long [] executionTime = new long[TOTAL_OP_TYPES]; private long [] executionTime = new long[TOTAL_OP_TYPES];
private long [] totalNumOfOps = new long[TOTAL_OP_TYPES]; private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
private byte[] buffer = new byte[1024]; private byte[] buffer = new byte[1024];
private boolean failed;
private DFSClientThread(int id) { private DFSClientThread(int id) {
this.id = id; this.id = id;
} }
@ -196,6 +203,7 @@ public class LoadGenerator extends Configured implements Tool {
} catch (Exception ioe) { } catch (Exception ioe) {
System.err.println(ioe.getLocalizedMessage()); System.err.println(ioe.getLocalizedMessage());
ioe.printStackTrace(); ioe.printStackTrace();
failed = true;
} }
} }
@ -272,6 +280,35 @@ public class LoadGenerator extends Configured implements Tool {
executionTime[LIST] += (Time.now()-startTime); executionTime[LIST] += (Time.now()-startTime);
totalNumOfOps[LIST]++; totalNumOfOps[LIST]++;
} }
/** Create a file with a length of <code>fileSize</code>.
* The file is filled with 'a'.
*/
private void genFile(Path file, long fileSize) throws IOException {
long startTime = Time.now();
FSDataOutputStream out = null;
try {
out = fc.create(file,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
CreateOpts.createParent(), CreateOpts.bufferSize(4096),
CreateOpts.repFac((short) 3));
executionTime[CREATE] += (Time.now() - startTime);
totalNumOfOps[CREATE]++;
long i = fileSize;
while (i > 0) {
long s = Math.min(fileSize, WRITE_CONTENTS.length);
out.write(WRITE_CONTENTS, 0, (int) s);
i -= s;
}
startTime = Time.now();
executionTime[WRITE_CLOSE] += (Time.now() - startTime);
totalNumOfOps[WRITE_CLOSE]++;
} finally {
IOUtils.cleanup(LOG, out);
}
}
} }
/** Main function: /** Main function:
@ -319,13 +356,21 @@ public class LoadGenerator extends Configured implements Tool {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Done with testing. Waiting for threads to finish."); LOG.debug("Done with testing. Waiting for threads to finish.");
} }
boolean failed = false;
for (DFSClientThread thread : threads) { for (DFSClientThread thread : threads) {
thread.join(); thread.join();
for (int i=0; i<TOTAL_OP_TYPES; i++) { for (int i=0; i<TOTAL_OP_TYPES; i++) {
executionTime[i] += thread.executionTime[i]; executionTime[i] += thread.executionTime[i];
totalNumOfOps[i] += thread.totalNumOfOps[i]; totalNumOfOps[i] += thread.totalNumOfOps[i];
} }
failed = failed || thread.failed;
} }
if (failed) {
exitCode = -ERR_TEST_FAILED;
}
long totalOps = 0; long totalOps = 0;
for (int i=0; i<TOTAL_OP_TYPES; i++) { for (int i=0; i<TOTAL_OP_TYPES; i++) {
totalOps += totalNumOfOps[i]; totalOps += totalNumOfOps[i];
@ -462,50 +507,50 @@ public class LoadGenerator extends Configured implements Tool {
String line; String line;
// Read script, parse values, build array of duration, read and write probs // Read script, parse values, build array of duration, read and write probs
while((line = br.readLine()) != null) {
while ((line = br.readLine()) != null) {
lineNum++; lineNum++;
if(line.startsWith("#") || line.isEmpty()) // skip comments and blanks if (line.startsWith("#") || line.isEmpty()) // skip comments and blanks
continue; continue;
String[] a = line.split("\\s"); String[] a = line.split("\\s");
if(a.length != 3) { if (a.length != 3) {
System.err.println("Line " + lineNum + System.err.println("Line " + lineNum
": Incorrect number of parameters: " + line); + ": Incorrect number of parameters: " + line);
} }
try { try {
long d = Long.parseLong(a[0]); long d = Long.parseLong(a[0]);
if(d < 0) { if (d < 0) {
System.err.println("Line " + lineNum + ": Invalid duration: " + d); System.err.println("Line " + lineNum + ": Invalid duration: " + d);
return -1; return -1;
} }
double r = Double.parseDouble(a[1]); double r = Double.parseDouble(a[1]);
if(r < 0.0 || r > 1.0 ) { if (r < 0.0 || r > 1.0) {
System.err.println("Line " + lineNum + System.err.println("Line " + lineNum
": The read probability must be [0, 1]: " + r); + ": The read probability must be [0, 1]: " + r);
return -1;
}
double w = Double.parseDouble(a[2]);
if(w < 0.0 || w > 1.0) {
System.err.println("Line " + lineNum +
": The read probability must be [0, 1]: " + r);
return -1; return -1;
} }
double w = Double.parseDouble(a[2]);
if (w < 0.0 || w > 1.0) {
System.err.println("Line " + lineNum
+ ": The read probability must be [0, 1]: " + r);
return -1;
}
readProb.add(r); readProb.add(r);
duration.add(d); duration.add(d);
writeProb.add(w); writeProb.add(w);
} catch( NumberFormatException nfe) { } catch (NumberFormatException nfe) {
System.err.println(lineNum + ": Can't parse: " + line); System.err.println(lineNum + ": Can't parse: " + line);
return -1; return -1;
} finally {
IOUtils.cleanup(LOG, br);
} }
} }
br.close();
fr.close();
// Copy vectors to arrays of values, to avoid autoboxing overhead later // Copy vectors to arrays of values, to avoid autoboxing overhead later
durations = new long[duration.size()]; durations = new long[duration.size()];
readProbs = new double[readProb.size()]; readProbs = new double[readProb.size()];
@ -581,27 +626,6 @@ public class LoadGenerator extends Configured implements Tool {
} }
} }
} }
/** Create a file with a length of <code>fileSize</code>.
* The file is filled with 'a'.
*/
private void genFile(Path file, long fileSize) throws IOException {
long startTime = Time.now();
FSDataOutputStream out = fc.create(file,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
CreateOpts.createParent(), CreateOpts.bufferSize(4096),
CreateOpts.repFac((short) 3));
executionTime[CREATE] += (Time.now()-startTime);
totalNumOfOps[CREATE]++;
for (long i=0; i<fileSize; i++) {
out.writeByte('a');
}
startTime = Time.now();
out.close();
executionTime[WRITE_CLOSE] += (Time.now()-startTime);
totalNumOfOps[WRITE_CLOSE]++;
}
/** Main program /** Main program
* *