From d5ef991ac44e3180a697482f2b3423699aab70eb Mon Sep 17 00:00:00 2001 From: ramkrishna Date: Sat, 26 May 2012 17:13:17 +0000 Subject: [PATCH] HBASE-6002 Possible chance of resource leak in HlogSplitter (Chinna Rao) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1342929 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/regionserver/wal/HLogSplitter.java | 113 +++++++++++------- 1 file changed, 73 insertions(+), 40 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 35f04138129..2d972fd4a0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -456,51 +456,79 @@ public class HLogSplitter { e = RemoteExceptionHandler.checkIOException(e); throw e; } finally { - int n = 0; - for (Map.Entry logWritersEntry : logWriters.entrySet()) { - Object o = logWritersEntry.getValue(); - long t1 = EnvironmentEdgeManager.currentTimeMillis(); - if ((t1 - last_report_at) > period) { - last_report_at = t; - if ((progress_failed == false) && (reporter != null) && - (reporter.progress() == false)) { - progress_failed = true; + boolean allWritersClosed = false; + try { + int n = 0; + for (Map.Entry logWritersEntry : logWriters.entrySet()) { + Object o = logWritersEntry.getValue(); + long t1 = EnvironmentEdgeManager.currentTimeMillis(); + if ((t1 - last_report_at) > period) { + last_report_at = t; + if ((progress_failed == false) && (reporter != null) + && (reporter.progress() == false)) { + progress_failed = true; + } + } + if (o == BAD_WRITER) { + continue; + } + n++; + WriterAndPath wap = (WriterAndPath) o; + try { + wap.writerClosed = true; + wap.w.close(); + LOG.debug("Closed " + wap.p); + } catch (IOException e) { + LOG.debug("Exception while closing the writer :", e); + } + Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink + .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey())); + if (!dst.equals(wap.p) && fs.exists(dst)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + dst + + ", length=" + fs.getFileStatus(dst).getLen()); + if (!fs.delete(dst, false)) { + LOG.warn("Failed deleting of old " + dst); + throw new IOException("Failed deleting of old " + dst); + } + } + // Skip the unit tests which create a splitter that reads and writes + // the + // data without touching disk. TestHLogSplit#testThreading is an + // example. + if (fs.exists(wap.p)) { + if (!fs.rename(wap.p, dst)) { + throw new IOException("Failed renaming " + wap.p + " to " + dst); + } + LOG.debug("Rename " + wap.p + " to " + dst); } } - if (o == BAD_WRITER) { - continue; - } - n++; - WriterAndPath wap = (WriterAndPath)o; - wap.w.close(); - LOG.debug("Closed " + wap.p); - Path dst = getCompletedRecoveredEditsFilePath(wap.p, - outputSink.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey())); - if (!dst.equals(wap.p) && fs.exists(dst)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " + dst - + ", length=" + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); - throw new IOException("Failed deleting of old " + dst); + allWritersClosed = true; + String msg = "Processed " + editsCount + " edits across " + n + + " regions" + " threw away edits for " + (logWriters.size() - n) + + " regions" + "; log file=" + logPath + " is corrupted = " + + isCorrupted + " progress failed = " + progress_failed; + LOG.info(msg); + status.markComplete(msg); + } finally { + if (!allWritersClosed) { + for (Map.Entry logWritersEntry : logWriters.entrySet()) { + Object o = logWritersEntry.getValue(); + if (o != BAD_WRITER) { + WriterAndPath wap = (WriterAndPath) o; + try { + if (!wap.writerClosed) { + wap.writerClosed = true; + wap.w.close(); + } + } catch (IOException e) { + LOG.debug("Exception while closing the writer :", e); + } + } } } - // Skip the unit tests which create a splitter that reads and writes the - // data without touching disk. TestHLogSplit#testThreading is an - // example. - if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " + dst); - } - LOG.debug("Rename " + wap.p + " to " + dst); - } + in.close(); } - String msg = "Processed " + editsCount + " edits across " + n + " regions" + - " threw away edits for " + (logWriters.size() - n) + " regions" + - "; log file=" + logPath + " is corrupted = " + isCorrupted + - " progress failed = " + progress_failed; - LOG.info(msg); - status.markComplete(msg); } return !progress_failed; } @@ -1349,6 +1377,11 @@ public class HLogSplitter { long editsWritten = 0; /* Number of nanos spent writing to this log */ long nanosSpent = 0; + + /* To check whether a close has already been tried on the + * writer + */ + boolean writerClosed = false; WriterAndPath(final Path p, final Writer w) { this.p = p;