diff --git a/CHANGES.txt b/CHANGES.txt index fcc7a119a8d..e3d41d5db90 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,8 @@ Release 0.92.1 - Unreleased BUG FIXES HBASE-5176 AssignmentManager#getRegion: logging nit adds a redundant '+' (Karthik K) HBASE-5237 Addendum for HBASE-5160 and HBASE-4397 (Ram) + HBASE-5235 HLogSplitter writer thread's streams not getting closed when any + of the writer threads has exceptions. (Ram) TESTS HBASE-5223 TestMetaReaderEditor is missing call to CatalogTracker.stop() diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 6f9417afa18..193f15a0a57 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -1142,7 +1142,9 @@ public class HLogSplitter { private final Set blacklistedRegions = Collections.synchronizedSet( new TreeSet(Bytes.BYTES_COMPARATOR)); - private boolean hasClosed = false; + private boolean closeAndCleanCompleted = false; + + private boolean logWritersClosed = false; /** * Start the threads that will pump data from the entryBuffers @@ -1167,20 +1169,27 @@ public class HLogSplitter { List finishWritingAndClose() throws IOException { LOG.info("Waiting for split writer threads to finish"); - for (WriterThread t : writerThreads) { - t.finish(); - } - for (WriterThread t: writerThreads) { - try { - t.join(); - } catch (InterruptedException ie) { - throw new IOException(ie); + try { + for (WriterThread t : writerThreads) { + t.finish(); } - checkForErrors(); - } - LOG.info("Split writers finished"); + for (WriterThread t : writerThreads) { + try { + t.join(); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + checkForErrors(); + } + LOG.info("Split writers finished"); - return closeStreams(); + return closeStreams(); + } finally { + List thrown = closeLogWriters(null); + if (thrown != null && !thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + } } /** @@ -1188,21 +1197,12 @@ public class HLogSplitter { * @return the list of paths written. */ private List closeStreams() throws IOException { - Preconditions.checkState(!hasClosed); + Preconditions.checkState(!closeAndCleanCompleted); List paths = new ArrayList(); List thrown = Lists.newArrayList(); - + closeLogWriters(thrown); for (WriterAndPath wap : logWriters.values()) { - try { - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); - thrown.add(ioe); - continue; - } - LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000/ 1000) + "ms)"); Path dst = getCompletedRecoveredEditsFilePath(wap.p); try { if (!dst.equals(wap.p) && fs.exists(dst)) { @@ -1233,9 +1233,31 @@ public class HLogSplitter { throw MultipleIOException.createIOException(thrown); } - hasClosed = true; + closeAndCleanCompleted = true; return paths; } + + private List closeLogWriters(List thrown) + throws IOException { + if (!logWritersClosed) { + if (thrown == null) { + thrown = Lists.newArrayList(); + } + for (WriterAndPath wap : logWriters.values()) { + try { + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + continue; + } + LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)"); + } + logWritersClosed = true; + } + return thrown; + } /** * Get a writer and path for a log starting at the given entry.