From d1449231f06f670f90a230ec36807a3a76850a75 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Mon, 22 Jun 2020 23:43:12 +0530 Subject: [PATCH] HBASE-24380 : Provide WAL splitting journal logging (#1860) (#1939) Signed-off-by: Andrew Purtell --- .../apache/hadoop/hbase/wal/WALSplitter.java | 138 +++++++++++++----- 1 file changed, 101 insertions(+), 37 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index f769e6db18d..9273b6a490e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -312,13 +312,14 @@ public class WALSplitter { status = TaskMonitor.get().createStatus( "Splitting log file " + logfile.getPath() + "into a temporary staging area."); + status.enableStatusJournal(true); Reader in = null; this.fileBeingSplit = logfile; try { long logLength = logfile.getLen(); LOG.info("Splitting wal: " + logPath + ", length=" + logLength); LOG.info("DistributedLogReplay = " + this.distributedLogReplay); - status.setStatus("Opening log file"); + status.setStatus("Opening log file " + logPath); if (reporter != null && !reporter.progress()) { progress_failed = true; return false; @@ -346,6 +347,7 @@ public class WALSplitter { int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); int numOpenedFilesLastCheck = 0; outputSink.setReporter(reporter); + outputSink.setStatus(status); outputSink.startWriterThreads(); outputSinkStarted = true; Entry entry; @@ -436,7 +438,9 @@ public class WALSplitter { e = RemoteExceptionHandler.checkIOException(e); throw e; } finally { - LOG.debug("Finishing writing output logs and closing down."); + final String log = "Finishing writing output logs and closing down"; + LOG.debug(log); + status.setStatus(log); try { if (null != in) { in.close(); @@ -460,6 +464,10 @@ public class WALSplitter { ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; LOG.info(msg); status.markComplete(msg); + if (LOG.isDebugEnabled()) { + LOG.debug("WAL split completed for " + logPath + " , Journal Log: " + + status.prettyPrintJournal()); + } } } return !progress_failed; @@ -1210,6 +1218,8 @@ public class WALSplitter { protected List splits = null; + protected MonitoredTask status = null; + public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { numThreads = numWriters; this.controller = controller; @@ -1262,6 +1272,10 @@ public class WALSplitter { return this.skippedEdits.get(); } + void setStatus(MonitoredTask status) { + this.status = status; + } + /** * Wait for writer threads to dump all info to the sink * @return true when there is no error @@ -1292,7 +1306,9 @@ public class WALSplitter { } } controller.checkForErrors(); - LOG.info(this.writerThreads.size() + " split writers finished; closing..."); + final String msg = this.writerThreads.size() + " split writer threads finished"; + LOG.info(msg); + updateStatusWithMsg(msg); return (!progress_failed); } @@ -1329,6 +1345,17 @@ public class WALSplitter { * @return Return true if this sink wants to accept this region-level WALEdit. */ public abstract boolean keepRegionEvent(Entry entry); + + /** + * Set status message in {@link MonitoredTask} instance that is set in this OutputSink + * + * @param msg message to update the status with + */ + protected final void updateStatusWithMsg(String msg) { + if (status != null) { + status.setStatus(msg); + } + } } /** @@ -1386,19 +1413,29 @@ public class WALSplitter { } } if (wap.minLogSeqNum < dstMinLogSeqNum) { - LOG.warn("Found existing old edits file. It could be the result of a previous failed" + final String errorMsg = + "Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" - + walFS.getFileStatus(dst).getLen()); + + walFS.getFileStatus(dst).getLen(); + LOG.warn(errorMsg); + updateStatusWithMsg(errorMsg); if (!walFS.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); + final String msg = "Failed deleting of old " + dst; + LOG.warn(msg); + updateStatusWithMsg(msg); throw new IOException("Failed deleting of old " + dst); } } else { - LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p - + ", length=" + walFS.getFileStatus(wap.p).getLen()); + final String errorMsg = + "Found existing old edits file and we have less entries. Deleting " + wap.p + ", length=" + + walFS.getFileStatus(wap.p).getLen(); + LOG.warn(errorMsg); + updateStatusWithMsg(errorMsg); if (!walFS.delete(wap.p, false)) { - LOG.warn("Failed deleting of " + wap.p); - throw new IOException("Failed deleting of " + wap.p); + final String failureMsg = "Failed deleting of " + wap.p; + LOG.warn(failureMsg); + updateStatusWithMsg(failureMsg); + throw new IOException(failureMsg); } } } @@ -1484,19 +1521,24 @@ public class WALSplitter { try { wap.w.close(); } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); + final String errorMsg = "Couldn't close log at " + wap.p; + LOG.error(errorMsg, ioe); + updateStatusWithMsg(errorMsg); thrown.add(ioe); return null; } + final String msg = + "Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits, skipped " + + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms"; if (LOG.isDebugEnabled()) { - LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten - + " edits, skipped " + wap.editsSkipped + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms"); + LOG.debug(msg); } + updateStatusWithMsg(msg); if (wap.editsWritten == 0) { // just remove the empty recovered.edits file if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { - LOG.warn("Failed deleting empty " + wap.p); + final String errorMsg = "Failed deleting empty " + wap.p; + LOG.warn(errorMsg); throw new IOException("Failed deleting empty " + wap.p); } return null; @@ -1513,12 +1555,18 @@ public class WALSplitter { // TestHLogSplit#testThreading is an example. if (walFS.exists(wap.p)) { if (!walFS.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " + dst); + final String errorMsg = "Failed renaming " + wap.p + " to " + dst; + updateStatusWithMsg(errorMsg); + throw new IOException(errorMsg); } - LOG.info("Rename " + wap.p + " to " + dst); + final String renameLog = "Rename " + wap.p + " to " + dst; + LOG.info(renameLog); + updateStatusWithMsg(renameLog); } } catch (IOException ioe) { - LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + final String errorMsg = "Couldn't rename " + wap.p + " to " + dst; + LOG.error(errorMsg, ioe); + updateStatusWithMsg(errorMsg); thrown.add(ioe); return null; } @@ -1555,13 +1603,17 @@ public class WALSplitter { wap = (WriterAndPath) tmpWAP; wap.w.close(); } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); + final String errorMsg = "Couldn't close log at " + wap.p; + LOG.error(errorMsg, ioe); + updateStatusWithMsg(errorMsg); thrown.add(ioe); continue; } - LOG.info( - "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent - / 1000 / 1000) + "ms)"); + final String msg = + "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent + / 1000 / 1000) + "ms)"; + LOG.info(msg); + updateStatusWithMsg(msg); } writersClosed = true; } @@ -1610,15 +1662,21 @@ public class WALSplitter { return null; } if (walFS.exists(regionedits)) { - LOG.warn("Found old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" - + walFS.getFileStatus(regionedits).getLen()); + final String warnMsg = "Found old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + + walFS.getFileStatus(regionedits).getLen(); + LOG.warn(warnMsg); + updateStatusWithMsg(warnMsg); if (!walFS.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); + final String errorMsg = "Failed delete of old " + regionedits; + LOG.warn(errorMsg); + updateStatusWithMsg(errorMsg); } } Writer w = createWriter(regionedits); - LOG.debug("Creating writer path=" + regionedits); + final String msg = "Creating writer path=" + regionedits; + LOG.debug(msg); + updateStatusWithMsg(msg); return new WriterAndPath(regionedits, w, entry.getKey().getLogSeqNum()); } @@ -1666,10 +1724,10 @@ public class WALSplitter { WriterAndPath wap = null; long startTime = System.nanoTime(); - try { - int editsCount = 0; + int editsCount = 0; - for (Entry logEntry : entries) { + for (Entry logEntry : entries) { + try { if (wap == null) { wap = getWriterAndPath(logEntry, reusable); if (wap == null) { @@ -1687,18 +1745,24 @@ public class WALSplitter { } else { wap.incrementSkippedEdits(1); } + } catch (IOException e) { + logAndThrowWriterAppendFailure(logEntry, e); } - // Pass along summary statistics - wap.incrementEdits(editsCount); - wap.incrementNanoTime(System.nanoTime() - startTime); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal(" Got while writing log entry to log", e); - throw e; } + // Pass along summary statistics + wap.incrementEdits(editsCount); + wap.incrementNanoTime(System.nanoTime() - startTime); return wap; } + private void logAndThrowWriterAppendFailure(Entry logEntry, IOException e) throws IOException { + e = RemoteExceptionHandler.checkIOException(e); + final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log"; + LOG.fatal(errorMsg, e); + updateStatusWithMsg(errorMsg); + throw e; + } + @Override public boolean keepRegionEvent(Entry entry) { ArrayList cells = entry.getEdit().getCells();