From 3c319811799cb4c1f51fb5b43dd4743acd28052c Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sat, 20 Jun 2020 04:55:03 +0530 Subject: [PATCH] HBASE-24380 : Provide WAL splitting journal logging (#1860) Signed-off-by: Andrew Purtell --- .../wal/AbstractRecoveredEditsOutputSink.java | 82 ++++++++++++------- .../wal/BoundedRecoveredEditsOutputSink.java | 5 +- .../wal/BoundedRecoveredHFilesOutputSink.java | 2 +- .../apache/hadoop/hbase/wal/OutputSink.java | 22 ++++- .../hbase/wal/RecoveredEditsOutputSink.java | 8 +- .../apache/hadoop/hbase/wal/WALSplitter.java | 12 ++- 6 files changed, 92 insertions(+), 39 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java index da952eb5a3f..0da082a4caf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java @@ -57,7 +57,7 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink { * @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */ protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region, - long seqId) throws IOException { + long seqId) throws IOException { Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId, walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(), walSplitter.conf); @@ -70,27 +70,35 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink { } } WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); - LOG.info("Creating recovered edits writer path={}", regionEditsPath); + final String msg = "Creating recovered edits writer path=" + regionEditsPath; + LOG.info(msg); + updateStatusWithMsg(msg); return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); } protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, - List thrown) throws IOException { + List thrown) throws IOException { try { editsWriter.writer.close(); } catch (IOException ioe) { - LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe); + final String errorMsg = "Could not close recovered edits at " + editsWriter.path; + LOG.error(errorMsg, ioe); + updateStatusWithMsg(errorMsg); thrown.add(ioe); return null; } - LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms", - editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped, - editsWriter.nanosSpent / 1000 / 1000); + final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " + + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + ( + editsWriter.nanosSpent / 1000 / 1000) + " ms)"; + LOG.info(msg); + updateStatusWithMsg(msg); if (editsWriter.editsWritten == 0) { // just remove the empty recovered.edits file - if (walSplitter.walFS.exists(editsWriter.path) && - !walSplitter.walFS.delete(editsWriter.path, false)) { - LOG.warn("Failed deleting empty {}", editsWriter.path); + if (walSplitter.walFS.exists(editsWriter.path) + && !walSplitter.walFS.delete(editsWriter.path, false)) { + final String errorMsg = "Failed deleting empty " + editsWriter.path; + LOG.warn(errorMsg); + updateStatusWithMsg(errorMsg); throw new IOException("Failed deleting empty " + editsWriter.path); } return null; @@ -107,13 +115,20 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink { // TestHLogSplit#testThreading is an example. if (walSplitter.walFS.exists(editsWriter.path)) { if (!walSplitter.walFS.rename(editsWriter.path, dst)) { - throw new IOException( - "Failed renaming recovered edits " + editsWriter.path + " to " + dst); + final String errorMsg = + "Failed renaming recovered edits " + editsWriter.path + " to " + dst; + updateStatusWithMsg(errorMsg); + throw new IOException(errorMsg); } - LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst); + final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst; + LOG.info(renameEditMsg); + updateStatusWithMsg(renameEditMsg); } } catch (IOException ioe) { - LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe); + final String errorMsg = "Could not rename recovered edits " + editsWriter.path + + " to " + dst; + LOG.error(errorMsg, ioe); + updateStatusWithMsg(errorMsg); thrown.add(ioe); return null; } @@ -216,26 +231,33 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink { void writeRegionEntries(List entries) throws IOException { long startTime = System.nanoTime(); - try { - int editsCount = 0; - for (WAL.Entry logEntry : entries) { - filterCellByStore(logEntry); - if (!logEntry.getEdit().isEmpty()) { + int editsCount = 0; + for (WAL.Entry logEntry : entries) { + filterCellByStore(logEntry); + if (!logEntry.getEdit().isEmpty()) { + try { writer.append(logEntry); - updateRegionMaximumEditLogSeqNum(logEntry); - editsCount++; - } else { - incrementSkippedEdits(1); + } catch (IOException e) { + logAndThrowWriterAppendFailure(logEntry, e); } + updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; + } else { + incrementSkippedEdits(1); } - // Pass along summary statistics - incrementEdits(editsCount); - incrementNanoTime(System.nanoTime() - startTime); - } catch (IOException e) { - e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; - LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); - throw e; } + // Pass along summary statistics + incrementEdits(editsCount); + incrementNanoTime(System.nanoTime() - startTime); + } + + private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e) + throws IOException { + e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; + final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log"; + LOG.error(HBaseMarkers.FATAL, errorMsg, e); + updateStatusWithMsg(errorMsg); + throw e; } private void filterCellByStore(WAL.Entry logEntry) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java index 1f8c195c3c7..e2aa478075c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java @@ -57,7 +57,8 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { } @Override - public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { + public void append(EntryBuffers.RegionEntryBuffer buffer) + throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -86,7 +87,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { public List close() throws IOException { boolean isSuccessful = true; try { - isSuccessful &= finishWriterThreads(); + isSuccessful = finishWriterThreads(); } finally { isSuccessful &= writeRemainingEntryBuffers(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java index 0c6f79e93c5..50394f09bf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java @@ -131,7 +131,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { public List close() throws IOException { boolean isSuccessful = true; try { - isSuccessful &= finishWriterThreads(); + isSuccessful = finishWriterThreads(); } finally { isSuccessful &= writeRemainingEntryBuffers(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java index ae9aec89c31..f60721744e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; @@ -61,6 +62,8 @@ abstract class OutputSink { */ protected final List splits = new ArrayList<>(); + protected MonitoredTask status = null; + /** * Used when close this output sink. */ @@ -81,6 +84,10 @@ abstract class OutputSink { this.reporter = reporter; } + void setStatus(MonitoredTask status) { + this.status = status; + } + /** * Start the threads that will pump data from the entryBuffers to the output files. */ @@ -117,7 +124,9 @@ abstract class OutputSink { } } controller.checkForErrors(); - LOG.info("{} split writer threads finished", this.writerThreads.size()); + final String msg = this.writerThreads.size() + " split writer threads finished"; + LOG.info(msg); + updateStatusWithMsg(msg); return (!progressFailed); } @@ -132,6 +141,7 @@ abstract class OutputSink { /** * @param buffer A buffer of some number of edits for a given region. + * @throws IOException For any IO errors */ abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; @@ -154,6 +164,16 @@ abstract class OutputSink { */ abstract boolean keepRegionEvent(WAL.Entry entry); + /** + * Set status message in {@link MonitoredTask} instance that is set in this OutputSink + * @param msg message to update the status with + */ + protected final void updateStatusWithMsg(String msg) { + if (status != null) { + status.setStatus(msg); + } + } + public static class WriterThread extends Thread { private volatile boolean shouldStop = false; private WALSplitter.PipelineController controller; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java index e372f6d23fd..645af60efcb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.MultipleIOException; import org.apache.yetus.audience.InterfaceAudience; @@ -54,7 +55,8 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { } @Override - public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { + public void append(EntryBuffers.RegionEntryBuffer buffer) + throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -74,7 +76,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { * @return null if this region shouldn't output any logs */ private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region, - long seqId) throws IOException { + long seqId) throws IOException { RecoveredEditsWriter ret = writers.get(Bytes.toString(region)); if (ret != null) { return ret; @@ -92,7 +94,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { public List close() throws IOException { boolean isSuccessful = true; try { - isSuccessful &= finishWriterThreads(); + isSuccessful = finishWriterThreads(); } finally { isSuccessful &= closeWriters(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 45ecc7381d3..093f4ffcbbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -269,6 +269,7 @@ public class WALSplitter { status = TaskMonitor.get().createStatus( "Splitting log file " + logfile.getPath() + "into a temporary staging area."); + status.enableStatusJournal(true); Reader logFileReader = null; this.fileBeingSplit = logfile; long startTS = EnvironmentEdgeManager.currentTime(); @@ -276,7 +277,7 @@ public class WALSplitter { long logLength = logfile.getLen(); LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength), logLength); - status.setStatus("Opening log file"); + status.setStatus("Opening log file " + logPath); if (reporter != null && !reporter.progress()) { progressFailed = true; return false; @@ -291,6 +292,7 @@ public class WALSplitter { int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); int numOpenedFilesLastCheck = 0; outputSink.setReporter(reporter); + outputSink.setStatus(status); outputSink.startWriterThreads(); outputSinkStarted = true; Entry entry; @@ -375,7 +377,9 @@ public class WALSplitter { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; throw e; } finally { - LOG.debug("Finishing writing output logs and closing down"); + final String log = "Finishing writing output logs and closing down"; + LOG.debug(log); + status.setStatus(log); try { if (null != logFileReader) { logFileReader.close(); @@ -400,6 +404,10 @@ public class WALSplitter { ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed; LOG.info(msg); status.markComplete(msg); + if (LOG.isDebugEnabled()) { + LOG.debug("WAL split completed for {} , Journal Log: {}", logPath, + status.prettyPrintJournal()); + } } } return !progressFailed;