diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index c0f4afa66a6..63707a324c0 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -362,6 +362,10 @@ Trunk (unreleased changes) MAPREDUCE-2463. Job history files are not moved to done folder when job history location is hdfs. (Devaraj K via szetszwo) + MAPREDUCE-2243. Close streams propely in a finally-block to avoid leakage + in CompletedJobStatusStore, TaskLog, EventWriter and TotalOrderPartitioner. + (Devaraj K via szetszwo) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java b/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java index 10e4879a20f..16b399edeea 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; /** @@ -172,8 +173,9 @@ class CompletedJobStatusStore implements Runnable { if (active && retainTime > 0) { JobID jobId = job.getStatus().getJobID(); Path jobStatusFile = getInfoFilePath(jobId); + FSDataOutputStream dataOut = null; try { - FSDataOutputStream dataOut = fs.create(jobStatusFile); + dataOut = fs.create(jobStatusFile); job.getStatus().write(dataOut); @@ -189,6 +191,7 @@ class CompletedJobStatusStore implements Runnable { } dataOut.close(); + dataOut = null; // set dataOut to null explicitly so that close in finally will not be executed again. } catch (IOException ex) { LOG.warn("Could not store [" + jobId + "] job info : " + ex.getMessage(), ex); @@ -198,6 +201,8 @@ class CompletedJobStatusStore implements Runnable { catch (IOException ex1) { //ignore } + } finally { + IOUtils.cleanup(LOG, dataOut); } } } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java b/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java index 7a5e55bb3e5..810eea3a413 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.util.ProcessTree; @@ -111,34 +112,42 @@ public class TaskLog { //stderr: //syslog: LogFileDetail l = new LogFileDetail(); - String str = fis.readLine(); - if (str == null) { //the file doesn't have anything - throw new IOException ("Index file for the log of " + taskid+" doesn't exist."); - } - l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+ - LogFileDetail.LOCATION.length()); - //special cases are the debugout and profile.out files. They are guaranteed - //to be associated with each task attempt since jvm reuse is disabled - //when profiling/debugging is enabled - if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) { - l.length = new File(l.location, filter.toString()).length(); - l.start = 0; - fis.close(); - return l; - } - str = fis.readLine(); - while (str != null) { - //look for the exact line containing the logname - if (str.contains(filter.toString())) { - str = str.substring(filter.toString().length()+1); - String[] startAndLen = str.split(" "); - l.start = Long.parseLong(startAndLen[0]); - l.length = Long.parseLong(startAndLen[1]); - break; + String str = null; + try { + str = fis.readLine(); + if (str == null) { // the file doesn't have anything + throw new IOException("Index file for the log of " + taskid + + " doesn't exist."); + } + l.location = str.substring(str.indexOf(LogFileDetail.LOCATION) + + LogFileDetail.LOCATION.length()); + // special cases are the debugout and profile.out files. They are + // guaranteed + // to be associated with each task attempt since jvm reuse is disabled + // when profiling/debugging is enabled + if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) { + l.length = new File(l.location, filter.toString()).length(); + l.start = 0; + fis.close(); + return l; } str = fis.readLine(); + while (str != null) { + // look for the exact line containing the logname + if (str.contains(filter.toString())) { + str = str.substring(filter.toString().length() + 1); + String[] startAndLen = str.split(" "); + l.start = Long.parseLong(startAndLen[0]); + l.length = Long.parseLong(startAndLen[1]); + break; + } + str = fis.readLine(); + } + fis.close(); + fis = null; + } finally { + IOUtils.cleanup(LOG, fis); } - fis.close(); return l; } @@ -189,22 +198,27 @@ public class TaskLog { //LOG_DIR: //STDOUT: //STDERR: - //SYSLOG: - dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n" - + LogName.STDOUT.toString() + ":"); - dos.writeBytes(Long.toString(prevOutLength) + " "); - dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT - .toString()).length() - prevOutLength) - + "\n" + LogName.STDERR + ":"); - dos.writeBytes(Long.toString(prevErrLength) + " "); - dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR - .toString()).length() - prevErrLength) - + "\n" + LogName.SYSLOG.toString() + ":"); - dos.writeBytes(Long.toString(prevLogLength) + " "); - dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG - .toString()).length() - prevLogLength) - + "\n"); - dos.close(); + //SYSLOG: + try{ + dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n" + + LogName.STDOUT.toString() + ":"); + dos.writeBytes(Long.toString(prevOutLength) + " "); + dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT + .toString()).length() - prevOutLength) + + "\n" + LogName.STDERR + ":"); + dos.writeBytes(Long.toString(prevErrLength) + " "); + dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR + .toString()).length() - prevErrLength) + + "\n" + LogName.SYSLOG.toString() + ":"); + dos.writeBytes(Long.toString(prevLogLength) + " "); + dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG + .toString()).length() - prevLogLength) + + "\n"); + dos.close(); + dos = null; + } finally { + IOUtils.cleanup(LOG, dos); + } File indexFile = getIndexFile(currentTaskid, isCleanup); Path indexFilePath = new Path(indexFile.getAbsolutePath()); diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java index 400bcb28374..d8516a0a0d7 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; @@ -33,6 +34,8 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.util.Utf8; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Event Writer is an utility class used to write events to the underlying @@ -47,6 +50,7 @@ class EventWriter { private DatumWriter writer = new SpecificDatumWriter(Event.class); private Encoder encoder; + private static final Log LOG = LogFactory.getLog(EventWriter.class); EventWriter(FSDataOutputStream out) throws IOException { this.out = out; @@ -72,8 +76,13 @@ class EventWriter { } void close() throws IOException { - encoder.flush(); - out.close(); + try { + encoder.flush(); + out.close(); + out = null; + } finally { + IOUtils.cleanup(LOG, out); + } } private static final Schema GROUPS = diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java index fe14926b1a0..fa12976c261 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java @@ -23,6 +23,8 @@ import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Arrays; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; @@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.RawComparator; @@ -56,6 +59,7 @@ public class TotalOrderPartitioner,V> public static final String NATURAL_ORDER = "mapreduce.totalorderpartitioner.naturalorder"; Configuration conf; + private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class); public TotalOrderPartitioner() { } @@ -298,11 +302,16 @@ public class TotalOrderPartitioner,V> ArrayList parts = new ArrayList(); K key = ReflectionUtils.newInstance(keyClass, conf); NullWritable value = NullWritable.get(); - while (reader.next(key, value)) { - parts.add(key); - key = ReflectionUtils.newInstance(keyClass, conf); + try { + while (reader.next(key, value)) { + parts.add(key); + key = ReflectionUtils.newInstance(keyClass, conf); + } + reader.close(); + reader = null; + } finally { + IOUtils.cleanup(LOG, reader); } - reader.close(); return parts.toArray((K[])Array.newInstance(keyClass, parts.size())); }