HBASE-5235 HLogSplitter writer thread's streams not getting closed when any of the writer threads has exceptions.(Ram)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1234509 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2012-01-22 13:20:15 +00:00
parent da0f65de98
commit fbf165d347
2 changed files with 49 additions and 25 deletions

View File

@ -7,6 +7,8 @@ Release 0.92.1 - Unreleased
BUG FIXES BUG FIXES
HBASE-5176 AssignmentManager#getRegion: logging nit adds a redundant '+' (Karthik K) HBASE-5176 AssignmentManager#getRegion: logging nit adds a redundant '+' (Karthik K)
HBASE-5237 Addendum for HBASE-5160 and HBASE-4397 (Ram) HBASE-5237 Addendum for HBASE-5160 and HBASE-4397 (Ram)
HBASE-5235 HLogSplitter writer thread's streams not getting closed when any
of the writer threads has exceptions. (Ram)
TESTS TESTS
HBASE-5223 TestMetaReaderEditor is missing call to CatalogTracker.stop() HBASE-5223 TestMetaReaderEditor is missing call to CatalogTracker.stop()

View File

@ -1142,7 +1142,9 @@ public class HLogSplitter {
private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet( private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR)); new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
private boolean hasClosed = false; private boolean closeAndCleanCompleted = false;
private boolean logWritersClosed = false;
/** /**
* Start the threads that will pump data from the entryBuffers * Start the threads that will pump data from the entryBuffers
@ -1167,20 +1169,27 @@ public class HLogSplitter {
List<Path> finishWritingAndClose() throws IOException { List<Path> finishWritingAndClose() throws IOException {
LOG.info("Waiting for split writer threads to finish"); LOG.info("Waiting for split writer threads to finish");
for (WriterThread t : writerThreads) { try {
t.finish(); for (WriterThread t : writerThreads) {
} t.finish();
for (WriterThread t: writerThreads) {
try {
t.join();
} catch (InterruptedException ie) {
throw new IOException(ie);
} }
checkForErrors(); for (WriterThread t : writerThreads) {
} try {
LOG.info("Split writers finished"); t.join();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
checkForErrors();
}
LOG.info("Split writers finished");
return closeStreams(); return closeStreams();
} finally {
List<IOException> thrown = closeLogWriters(null);
if (thrown != null && !thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
}
} }
/** /**
@ -1188,21 +1197,12 @@ public class HLogSplitter {
* @return the list of paths written. * @return the list of paths written.
*/ */
private List<Path> closeStreams() throws IOException { private List<Path> closeStreams() throws IOException {
Preconditions.checkState(!hasClosed); Preconditions.checkState(!closeAndCleanCompleted);
List<Path> paths = new ArrayList<Path>(); List<Path> paths = new ArrayList<Path>();
List<IOException> thrown = Lists.newArrayList(); List<IOException> thrown = Lists.newArrayList();
closeLogWriters(thrown);
for (WriterAndPath wap : logWriters.values()) { for (WriterAndPath wap : logWriters.values()) {
try {
wap.w.close();
} catch (IOException ioe) {
LOG.error("Couldn't close log at " + wap.p, ioe);
thrown.add(ioe);
continue;
}
LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
+ (wap.nanosSpent / 1000/ 1000) + "ms)");
Path dst = getCompletedRecoveredEditsFilePath(wap.p); Path dst = getCompletedRecoveredEditsFilePath(wap.p);
try { try {
if (!dst.equals(wap.p) && fs.exists(dst)) { if (!dst.equals(wap.p) && fs.exists(dst)) {
@ -1233,9 +1233,31 @@ public class HLogSplitter {
throw MultipleIOException.createIOException(thrown); throw MultipleIOException.createIOException(thrown);
} }
hasClosed = true; closeAndCleanCompleted = true;
return paths; return paths;
} }
private List<IOException> closeLogWriters(List<IOException> thrown)
throws IOException {
if (!logWritersClosed) {
if (thrown == null) {
thrown = Lists.newArrayList();
}
for (WriterAndPath wap : logWriters.values()) {
try {
wap.w.close();
} catch (IOException ioe) {
LOG.error("Couldn't close log at " + wap.p, ioe);
thrown.add(ioe);
continue;
}
LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
+ " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
}
logWritersClosed = true;
}
return thrown;
}
/** /**
* Get a writer and path for a log starting at the given entry. * Get a writer and path for a log starting at the given entry.