HBASE-24380 : Provide WAL splitting journal logging (#1860)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Viraj Jasani 2020-06-20 04:55:03 +05:30 committed by GitHub
parent 6b62c0b6d2
commit 3c31981179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 92 additions and 39 deletions

View File

@ -70,7 +70,9 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
} }
} }
WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
LOG.info("Creating recovered edits writer path={}", regionEditsPath); final String msg = "Creating recovered edits writer path=" + regionEditsPath;
LOG.info(msg);
updateStatusWithMsg(msg);
return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
} }
@ -79,18 +81,24 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
try { try {
editsWriter.writer.close(); editsWriter.writer.close();
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe); final String errorMsg = "Could not close recovered edits at " + editsWriter.path;
LOG.error(errorMsg, ioe);
updateStatusWithMsg(errorMsg);
thrown.add(ioe); thrown.add(ioe);
return null; return null;
} }
LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms", final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote "
editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped, + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + (
editsWriter.nanosSpent / 1000 / 1000); editsWriter.nanosSpent / 1000 / 1000) + " ms)";
LOG.info(msg);
updateStatusWithMsg(msg);
if (editsWriter.editsWritten == 0) { if (editsWriter.editsWritten == 0) {
// just remove the empty recovered.edits file // just remove the empty recovered.edits file
if (walSplitter.walFS.exists(editsWriter.path) && if (walSplitter.walFS.exists(editsWriter.path)
!walSplitter.walFS.delete(editsWriter.path, false)) { && !walSplitter.walFS.delete(editsWriter.path, false)) {
LOG.warn("Failed deleting empty {}", editsWriter.path); final String errorMsg = "Failed deleting empty " + editsWriter.path;
LOG.warn(errorMsg);
updateStatusWithMsg(errorMsg);
throw new IOException("Failed deleting empty " + editsWriter.path); throw new IOException("Failed deleting empty " + editsWriter.path);
} }
return null; return null;
@ -107,13 +115,20 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
// TestHLogSplit#testThreading is an example. // TestHLogSplit#testThreading is an example.
if (walSplitter.walFS.exists(editsWriter.path)) { if (walSplitter.walFS.exists(editsWriter.path)) {
if (!walSplitter.walFS.rename(editsWriter.path, dst)) { if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
throw new IOException( final String errorMsg =
"Failed renaming recovered edits " + editsWriter.path + " to " + dst); "Failed renaming recovered edits " + editsWriter.path + " to " + dst;
updateStatusWithMsg(errorMsg);
throw new IOException(errorMsg);
} }
LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst); final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst;
LOG.info(renameEditMsg);
updateStatusWithMsg(renameEditMsg);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe); final String errorMsg = "Could not rename recovered edits " + editsWriter.path
+ " to " + dst;
LOG.error(errorMsg, ioe);
updateStatusWithMsg(errorMsg);
thrown.add(ioe); thrown.add(ioe);
return null; return null;
} }
@ -216,12 +231,15 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
void writeRegionEntries(List<WAL.Entry> entries) throws IOException { void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
try {
int editsCount = 0; int editsCount = 0;
for (WAL.Entry logEntry : entries) { for (WAL.Entry logEntry : entries) {
filterCellByStore(logEntry); filterCellByStore(logEntry);
if (!logEntry.getEdit().isEmpty()) { if (!logEntry.getEdit().isEmpty()) {
try {
writer.append(logEntry); writer.append(logEntry);
} catch (IOException e) {
logAndThrowWriterAppendFailure(logEntry, e);
}
updateRegionMaximumEditLogSeqNum(logEntry); updateRegionMaximumEditLogSeqNum(logEntry);
editsCount++; editsCount++;
} else { } else {
@ -231,11 +249,15 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
// Pass along summary statistics // Pass along summary statistics
incrementEdits(editsCount); incrementEdits(editsCount);
incrementNanoTime(System.nanoTime() - startTime); incrementNanoTime(System.nanoTime() - startTime);
} catch (IOException e) {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
throw e;
} }
private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e)
throws IOException {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
LOG.error(HBaseMarkers.FATAL, errorMsg, e);
updateStatusWithMsg(errorMsg);
throw e;
} }
private void filterCellByStore(WAL.Entry logEntry) { private void filterCellByStore(WAL.Entry logEntry) {

View File

@ -57,7 +57,8 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
} }
@Override @Override
public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { public void append(EntryBuffers.RegionEntryBuffer buffer)
throws IOException {
List<WAL.Entry> entries = buffer.entryBuffer; List<WAL.Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) { if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping"); LOG.warn("got an empty buffer, skipping");
@ -86,7 +87,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
public List<Path> close() throws IOException { public List<Path> close() throws IOException {
boolean isSuccessful = true; boolean isSuccessful = true;
try { try {
isSuccessful &= finishWriterThreads(); isSuccessful = finishWriterThreads();
} finally { } finally {
isSuccessful &= writeRemainingEntryBuffers(); isSuccessful &= writeRemainingEntryBuffers();
} }

View File

@ -131,7 +131,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
public List<Path> close() throws IOException { public List<Path> close() throws IOException {
boolean isSuccessful = true; boolean isSuccessful = true;
try { try {
isSuccessful &= finishWriterThreads(); isSuccessful = finishWriterThreads();
} finally { } finally {
isSuccessful &= writeRemainingEntryBuffers(); isSuccessful &= writeRemainingEntryBuffers();
} }

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -61,6 +62,8 @@ abstract class OutputSink {
*/ */
protected final List<Path> splits = new ArrayList<>(); protected final List<Path> splits = new ArrayList<>();
protected MonitoredTask status = null;
/** /**
* Used when close this output sink. * Used when close this output sink.
*/ */
@ -81,6 +84,10 @@ abstract class OutputSink {
this.reporter = reporter; this.reporter = reporter;
} }
void setStatus(MonitoredTask status) {
this.status = status;
}
/** /**
* Start the threads that will pump data from the entryBuffers to the output files. * Start the threads that will pump data from the entryBuffers to the output files.
*/ */
@ -117,7 +124,9 @@ abstract class OutputSink {
} }
} }
controller.checkForErrors(); controller.checkForErrors();
LOG.info("{} split writer threads finished", this.writerThreads.size()); final String msg = this.writerThreads.size() + " split writer threads finished";
LOG.info(msg);
updateStatusWithMsg(msg);
return (!progressFailed); return (!progressFailed);
} }
@ -132,6 +141,7 @@ abstract class OutputSink {
/** /**
* @param buffer A buffer of some number of edits for a given region. * @param buffer A buffer of some number of edits for a given region.
* @throws IOException For any IO errors
*/ */
abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
@ -154,6 +164,16 @@ abstract class OutputSink {
*/ */
abstract boolean keepRegionEvent(WAL.Entry entry); abstract boolean keepRegionEvent(WAL.Entry entry);
/**
* Set status message in {@link MonitoredTask} instance that is set in this OutputSink
* @param msg message to update the status with
*/
protected final void updateStatusWithMsg(String msg) {
if (status != null) {
status.setStatus(msg);
}
}
public static class WriterThread extends Thread { public static class WriterThread extends Thread {
private volatile boolean shouldStop = false; private volatile boolean shouldStop = false;
private WALSplitter.PipelineController controller; private WALSplitter.PipelineController controller;

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -54,7 +55,8 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
} }
@Override @Override
public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { public void append(EntryBuffers.RegionEntryBuffer buffer)
throws IOException {
List<WAL.Entry> entries = buffer.entryBuffer; List<WAL.Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) { if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping"); LOG.warn("got an empty buffer, skipping");
@ -92,7 +94,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
public List<Path> close() throws IOException { public List<Path> close() throws IOException {
boolean isSuccessful = true; boolean isSuccessful = true;
try { try {
isSuccessful &= finishWriterThreads(); isSuccessful = finishWriterThreads();
} finally { } finally {
isSuccessful &= closeWriters(); isSuccessful &= closeWriters();
} }

View File

@ -269,6 +269,7 @@ public class WALSplitter {
status = TaskMonitor.get().createStatus( status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area."); "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
status.enableStatusJournal(true);
Reader logFileReader = null; Reader logFileReader = null;
this.fileBeingSplit = logfile; this.fileBeingSplit = logfile;
long startTS = EnvironmentEdgeManager.currentTime(); long startTS = EnvironmentEdgeManager.currentTime();
@ -276,7 +277,7 @@ public class WALSplitter {
long logLength = logfile.getLen(); long logLength = logfile.getLen();
LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength), LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
logLength); logLength);
status.setStatus("Opening log file"); status.setStatus("Opening log file " + logPath);
if (reporter != null && !reporter.progress()) { if (reporter != null && !reporter.progress()) {
progressFailed = true; progressFailed = true;
return false; return false;
@ -291,6 +292,7 @@ public class WALSplitter {
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0; int numOpenedFilesLastCheck = 0;
outputSink.setReporter(reporter); outputSink.setReporter(reporter);
outputSink.setStatus(status);
outputSink.startWriterThreads(); outputSink.startWriterThreads();
outputSinkStarted = true; outputSinkStarted = true;
Entry entry; Entry entry;
@ -375,7 +377,9 @@ public class WALSplitter {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
throw e; throw e;
} finally { } finally {
LOG.debug("Finishing writing output logs and closing down"); final String log = "Finishing writing output logs and closing down";
LOG.debug(log);
status.setStatus(log);
try { try {
if (null != logFileReader) { if (null != logFileReader) {
logFileReader.close(); logFileReader.close();
@ -400,6 +404,10 @@ public class WALSplitter {
", corrupted=" + isCorrupted + ", progress failed=" + progressFailed; ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
LOG.info(msg); LOG.info(msg);
status.markComplete(msg); status.markComplete(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
status.prettyPrintJournal());
}
} }
} }
return !progressFailed; return !progressFailed;