From 7b0964f354e90968c2dac2f7acc17214732aed64 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Thu, 14 Jan 2016 10:40:22 +0900 Subject: [PATCH] MAPREDUCE-6363. [NNBench] Lease mismatch error when running with multiple mappers. Contributed by Vlad Sharanhovich and Bibin A Chundatt. --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../java/org/apache/hadoop/hdfs/NNBench.java | 136 +++++++++--------- 2 files changed, 73 insertions(+), 66 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e6bc0507ccb..7d5d11aaf29 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1009,6 +1009,9 @@ Release 2.6.4 - UNRELEASED TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via jlowe) + MAPREDUCE-6363. [NNBench] Lease mismatch error when running with multiple + mappers. (Vlad Sharanhovich and Bibin A Chundatt via aajisaka) + Release 2.6.3 - 2015-12-17 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java index b6c010416bb..666ef0ec55a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java @@ -18,45 +18,42 @@ package org.apache.hadoop.hdfs; -import java.io.IOException; -import java.util.Date; +import java.io.BufferedReader; import java.io.DataInputStream; +import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; -import java.io.File; -import java.io.BufferedReader; -import java.util.StringTokenizer; import java.net.InetAddress; import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Iterator; +import java.util.StringTokenizer; -import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; - +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; - -import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile; - +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; /** * This program executes a specified operation that applies load to @@ -149,7 +146,7 @@ private static void createControlFiles() throws IOException { try { writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, LongWritable.class, CompressionType.NONE); - writer.append(new Text(strFileName), new LongWritable(0l)); + writer.append(new Text(strFileName), new LongWritable(i)); } finally { if (writer != null) { writer.close(); @@ -309,14 +306,7 @@ public static void parseInputs(final String[] args) { */ private static void analyzeResults() throws IOException { final FileSystem fs = FileSystem.get(config); - Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME), - "part-00000"); - - DataInputStream in; - in = new DataInputStream(fs.open(reduceFile)); - - BufferedReader lines; - lines = new BufferedReader(new InputStreamReader(in)); + Path reduceDir = new Path(baseDir, OUTPUT_DIR_NAME); long totalTimeAL1 = 0l; long totalTimeAL2 = 0l; @@ -327,32 +317,38 @@ private static void analyzeResults() throws IOException { long mapStartTimeTPmS = 0l; long mapEndTimeTPmS = 0l; - - String resultTPSLine1 = null; - String resultTPSLine2 = null; - String resultALLine1 = null; - String resultALLine2 = null; - - String line; - while((line = lines.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;"); - String attr = tokens.nextToken(); - if (attr.endsWith(":totalTimeAL1")) { - totalTimeAL1 = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":totalTimeAL2")) { - totalTimeAL2 = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":totalTimeTPmS")) { - totalTimeTPmS = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":latemaps")) { - lateMaps = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":numOfExceptions")) { - numOfExceptions = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":successfulFileOps")) { - successfulFileOps = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":mapStartTimeTPmS")) { - mapStartTimeTPmS = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":mapEndTimeTPmS")) { - mapEndTimeTPmS = Long.parseLong(tokens.nextToken()); + + FileStatus[] fss = fs.listStatus(reduceDir); + for (FileStatus status : fss) { + + Path reduceFile = status.getPath(); + DataInputStream in; + in = new DataInputStream(fs.open(reduceFile)); + + BufferedReader lines; + lines = new BufferedReader(new InputStreamReader(in)); + + String line; + while ((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;"); + String attr = tokens.nextToken(); + if (attr.endsWith(":totalTimeAL1")) { + totalTimeAL1 = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":totalTimeAL2")) { + totalTimeAL2 = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":totalTimeTPmS")) { + totalTimeTPmS = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":latemaps")) { + lateMaps = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":numOfExceptions")) { + numOfExceptions = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":successfulFileOps")) { + successfulFileOps = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":mapStartTimeTPmS")) { + mapStartTimeTPmS = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":mapEndTimeTPmS")) { + mapEndTimeTPmS = Long.parseLong(tokens.nextToken()); + } } } @@ -377,6 +373,11 @@ private static void analyzeResults() throws IOException { (double) successfulFileOps : (double) totalTimeTPmS / successfulFileOps; + String resultTPSLine1 = null; + String resultTPSLine2 = null; + String resultALLine1 = null; + String resultALLine2 = null; + if (operation.equals(OP_CREATE_WRITE)) { // For create/write/close, it is treated as two transactions, // since a file create from a client perspective involves create and close @@ -699,18 +700,21 @@ public void map(Text key, successfulFileOps = 0l; if (barrier()) { + String fileName = "file_" + value; if (op.equals(OP_CREATE_WRITE)) { startTimeTPmS = System.currentTimeMillis(); - doCreateWriteOp("file_" + hostName + "_", reporter); + doCreateWriteOp(fileName, reporter); } else if (op.equals(OP_OPEN_READ)) { startTimeTPmS = System.currentTimeMillis(); - doOpenReadOp("file_" + hostName + "_", reporter); + doOpenReadOp(fileName, reporter); } else if (op.equals(OP_RENAME)) { startTimeTPmS = System.currentTimeMillis(); - doRenameOp("file_" + hostName + "_", reporter); + doRenameOp(fileName, reporter); } else if (op.equals(OP_DELETE)) { startTimeTPmS = System.currentTimeMillis(); - doDeleteOp("file_" + hostName + "_", reporter); + } else { + throw new IllegalArgumentException( + "unsupported operation [" + op + "]"); } endTimeTPms = System.currentTimeMillis(); @@ -777,9 +781,8 @@ private void doCreateWriteOp(String name, reporter.setStatus("Finish "+ l + " files"); } catch (IOException e) { - LOG.info("Exception recorded in op: " + - "Create/Write/Close"); - + LOG.error("Exception recorded in op: Create/Write/Close, " + + "file: \"" + filePath + "\"", e); numOfExceptions++; } } @@ -822,7 +825,8 @@ private void doOpenReadOp(String name, reporter.setStatus("Finish "+ l + " files"); } catch (IOException e) { - LOG.info("Exception recorded in op: OpenRead " + e); + LOG.error("Exception recorded in op: OpenRead, " + "file: \"" + + filePath + "\"", e); numOfExceptions++; } } @@ -856,8 +860,8 @@ private void doRenameOp(String name, reporter.setStatus("Finish "+ l + " files"); } catch (IOException e) { - LOG.info("Exception recorded in op: Rename"); - + LOG.error("Exception recorded in op: Rename, " + "file: \"" + + filePath + "\"", e); numOfExceptions++; } } @@ -889,8 +893,8 @@ private void doDeleteOp(String name, reporter.setStatus("Finish "+ l + " files"); } catch (IOException e) { - LOG.info("Exception in recorded op: Delete"); - + LOG.error("Exception recorded in op: Delete, " + "file: \"" + + filePath + "\"", e); numOfExceptions++; } }