HBASE-6002 Possible chance of resource leak in HlogSplitter (Chinna Rao)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1342929 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2012-05-26 17:13:17 +00:00
parent 52a859360d
commit d5ef991ac4
1 changed files with 73 additions and 40 deletions

View File

@ -456,51 +456,79 @@ public class HLogSplitter {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);
throw e; throw e;
} finally { } finally {
int n = 0; boolean allWritersClosed = false;
for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) { try {
Object o = logWritersEntry.getValue(); int n = 0;
long t1 = EnvironmentEdgeManager.currentTimeMillis(); for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
if ((t1 - last_report_at) > period) { Object o = logWritersEntry.getValue();
last_report_at = t; long t1 = EnvironmentEdgeManager.currentTimeMillis();
if ((progress_failed == false) && (reporter != null) && if ((t1 - last_report_at) > period) {
(reporter.progress() == false)) { last_report_at = t;
progress_failed = true; if ((progress_failed == false) && (reporter != null)
&& (reporter.progress() == false)) {
progress_failed = true;
}
}
if (o == BAD_WRITER) {
continue;
}
n++;
WriterAndPath wap = (WriterAndPath) o;
try {
wap.writerClosed = true;
wap.w.close();
LOG.debug("Closed " + wap.p);
} catch (IOException e) {
LOG.debug("Exception while closing the writer :", e);
}
Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink
.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
if (!dst.equals(wap.p) && fs.exists(dst)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + dst
+ ", length=" + fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
}
}
// Skip the unit tests which create a splitter that reads and writes
// the
// data without touching disk. TestHLogSplit#testThreading is an
// example.
if (fs.exists(wap.p)) {
if (!fs.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.debug("Rename " + wap.p + " to " + dst);
} }
} }
if (o == BAD_WRITER) { allWritersClosed = true;
continue; String msg = "Processed " + editsCount + " edits across " + n
} + " regions" + " threw away edits for " + (logWriters.size() - n)
n++; + " regions" + "; log file=" + logPath + " is corrupted = "
WriterAndPath wap = (WriterAndPath)o; + isCorrupted + " progress failed = " + progress_failed;
wap.w.close(); LOG.info(msg);
LOG.debug("Closed " + wap.p); status.markComplete(msg);
Path dst = getCompletedRecoveredEditsFilePath(wap.p, } finally {
outputSink.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey())); if (!allWritersClosed) {
if (!dst.equals(wap.p) && fs.exists(dst)) { for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
LOG.warn("Found existing old edits file. It could be the " Object o = logWritersEntry.getValue();
+ "result of a previous failed split attempt. Deleting " + dst if (o != BAD_WRITER) {
+ ", length=" + fs.getFileStatus(dst).getLen()); WriterAndPath wap = (WriterAndPath) o;
if (!fs.delete(dst, false)) { try {
LOG.warn("Failed deleting of old " + dst); if (!wap.writerClosed) {
throw new IOException("Failed deleting of old " + dst); wap.writerClosed = true;
wap.w.close();
}
} catch (IOException e) {
LOG.debug("Exception while closing the writer :", e);
}
}
} }
} }
// Skip the unit tests which create a splitter that reads and writes the in.close();
// data without touching disk. TestHLogSplit#testThreading is an
// example.
if (fs.exists(wap.p)) {
if (!fs.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.debug("Rename " + wap.p + " to " + dst);
}
} }
String msg = "Processed " + editsCount + " edits across " + n + " regions" +
" threw away edits for " + (logWriters.size() - n) + " regions" +
"; log file=" + logPath + " is corrupted = " + isCorrupted +
" progress failed = " + progress_failed;
LOG.info(msg);
status.markComplete(msg);
} }
return !progress_failed; return !progress_failed;
} }
@ -1349,6 +1377,11 @@ public class HLogSplitter {
long editsWritten = 0; long editsWritten = 0;
/* Number of nanos spent writing to this log */ /* Number of nanos spent writing to this log */
long nanosSpent = 0; long nanosSpent = 0;
/* To check whether a close has already been tried on the
* writer
*/
boolean writerClosed = false;
WriterAndPath(final Path p, final Writer w) { WriterAndPath(final Path p, final Writer w) {
this.p = p; this.p = p;