HBASE-24380 : Provide WAL splitting journal logging (#1860) (#1939)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Viraj Jasani 2020-06-22 23:43:12 +05:30 committed by GitHub
parent 0224dccdb8
commit d1449231f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 101 additions and 37 deletions

View File

@ -312,13 +312,14 @@ public class WALSplitter {
status =
TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
status.enableStatusJournal(true);
Reader in = null;
this.fileBeingSplit = logfile;
try {
long logLength = logfile.getLen();
LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
status.setStatus("Opening log file");
status.setStatus("Opening log file " + logPath);
if (reporter != null && !reporter.progress()) {
progress_failed = true;
return false;
@ -346,6 +347,7 @@ public class WALSplitter {
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.setReporter(reporter);
outputSink.setStatus(status);
outputSink.startWriterThreads();
outputSinkStarted = true;
Entry entry;
@ -436,7 +438,9 @@ public class WALSplitter {
e = RemoteExceptionHandler.checkIOException(e);
throw e;
} finally {
LOG.debug("Finishing writing output logs and closing down.");
final String log = "Finishing writing output logs and closing down";
LOG.debug(log);
status.setStatus(log);
try {
if (null != in) {
in.close();
@ -460,6 +464,10 @@ public class WALSplitter {
", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
LOG.info(msg);
status.markComplete(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("WAL split completed for " + logPath + " , Journal Log: "
+ status.prettyPrintJournal());
}
}
}
return !progress_failed;
@ -1210,6 +1218,8 @@ public class WALSplitter {
protected List<Path> splits = null;
protected MonitoredTask status = null;
public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
numThreads = numWriters;
this.controller = controller;
@ -1262,6 +1272,10 @@ public class WALSplitter {
return this.skippedEdits.get();
}
void setStatus(MonitoredTask status) {
this.status = status;
}
/**
* Wait for writer threads to dump all info to the sink
* @return true when there is no error
@ -1292,7 +1306,9 @@ public class WALSplitter {
}
}
controller.checkForErrors();
LOG.info(this.writerThreads.size() + " split writers finished; closing...");
final String msg = this.writerThreads.size() + " split writer threads finished";
LOG.info(msg);
updateStatusWithMsg(msg);
return (!progress_failed);
}
@ -1329,6 +1345,17 @@ public class WALSplitter {
* @return Return true if this sink wants to accept this region-level WALEdit.
*/
public abstract boolean keepRegionEvent(Entry entry);
/**
* Set status message in {@link MonitoredTask} instance that is set in this OutputSink
*
* @param msg message to update the status with
*/
protected final void updateStatusWithMsg(String msg) {
if (status != null) {
status.setStatus(msg);
}
}
}
/**
@ -1386,19 +1413,29 @@ public class WALSplitter {
}
}
if (wap.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
final String errorMsg =
"Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ walFS.getFileStatus(dst).getLen());
+ walFS.getFileStatus(dst).getLen();
LOG.warn(errorMsg);
updateStatusWithMsg(errorMsg);
if (!walFS.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
final String msg = "Failed deleting of old " + dst;
LOG.warn(msg);
updateStatusWithMsg(msg);
throw new IOException("Failed deleting of old " + dst);
}
} else {
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
+ ", length=" + walFS.getFileStatus(wap.p).getLen());
final String errorMsg =
"Found existing old edits file and we have less entries. Deleting " + wap.p + ", length="
+ walFS.getFileStatus(wap.p).getLen();
LOG.warn(errorMsg);
updateStatusWithMsg(errorMsg);
if (!walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting of " + wap.p);
throw new IOException("Failed deleting of " + wap.p);
final String failureMsg = "Failed deleting of " + wap.p;
LOG.warn(failureMsg);
updateStatusWithMsg(failureMsg);
throw new IOException(failureMsg);
}
}
}
@ -1484,19 +1521,24 @@ public class WALSplitter {
try {
wap.w.close();
} catch (IOException ioe) {
LOG.error("Couldn't close log at " + wap.p, ioe);
final String errorMsg = "Couldn't close log at " + wap.p;
LOG.error(errorMsg, ioe);
updateStatusWithMsg(errorMsg);
thrown.add(ioe);
return null;
}
final String msg =
"Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits, skipped "
+ wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms";
if (LOG.isDebugEnabled()) {
LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
+ " edits, skipped " + wap.editsSkipped + " edits in "
+ (wap.nanosSpent / 1000 / 1000) + "ms");
LOG.debug(msg);
}
updateStatusWithMsg(msg);
if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file
if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting empty " + wap.p);
final String errorMsg = "Failed deleting empty " + wap.p;
LOG.warn(errorMsg);
throw new IOException("Failed deleting empty " + wap.p);
}
return null;
@ -1513,12 +1555,18 @@ public class WALSplitter {
// TestHLogSplit#testThreading is an example.
if (walFS.exists(wap.p)) {
if (!walFS.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
final String errorMsg = "Failed renaming " + wap.p + " to " + dst;
updateStatusWithMsg(errorMsg);
throw new IOException(errorMsg);
}
LOG.info("Rename " + wap.p + " to " + dst);
final String renameLog = "Rename " + wap.p + " to " + dst;
LOG.info(renameLog);
updateStatusWithMsg(renameLog);
}
} catch (IOException ioe) {
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
final String errorMsg = "Couldn't rename " + wap.p + " to " + dst;
LOG.error(errorMsg, ioe);
updateStatusWithMsg(errorMsg);
thrown.add(ioe);
return null;
}
@ -1555,13 +1603,17 @@ public class WALSplitter {
wap = (WriterAndPath) tmpWAP;
wap.w.close();
} catch (IOException ioe) {
LOG.error("Couldn't close log at " + wap.p, ioe);
final String errorMsg = "Couldn't close log at " + wap.p;
LOG.error(errorMsg, ioe);
updateStatusWithMsg(errorMsg);
thrown.add(ioe);
continue;
}
LOG.info(
final String msg =
"Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
/ 1000 / 1000) + "ms)");
/ 1000 / 1000) + "ms)";
LOG.info(msg);
updateStatusWithMsg(msg);
}
writersClosed = true;
}
@ -1610,15 +1662,21 @@ public class WALSplitter {
return null;
}
if (walFS.exists(regionedits)) {
LOG.warn("Found old edits file. It could be the "
final String warnMsg = "Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ walFS.getFileStatus(regionedits).getLen());
+ walFS.getFileStatus(regionedits).getLen();
LOG.warn(warnMsg);
updateStatusWithMsg(warnMsg);
if (!walFS.delete(regionedits, false)) {
LOG.warn("Failed delete of old " + regionedits);
final String errorMsg = "Failed delete of old " + regionedits;
LOG.warn(errorMsg);
updateStatusWithMsg(errorMsg);
}
}
Writer w = createWriter(regionedits);
LOG.debug("Creating writer path=" + regionedits);
final String msg = "Creating writer path=" + regionedits;
LOG.debug(msg);
updateStatusWithMsg(msg);
return new WriterAndPath(regionedits, w, entry.getKey().getLogSeqNum());
}
@ -1666,10 +1724,10 @@ public class WALSplitter {
WriterAndPath wap = null;
long startTime = System.nanoTime();
try {
int editsCount = 0;
for (Entry logEntry : entries) {
try {
if (wap == null) {
wap = getWriterAndPath(logEntry, reusable);
if (wap == null) {
@ -1687,18 +1745,24 @@ public class WALSplitter {
} else {
wap.incrementSkippedEdits(1);
}
} catch (IOException e) {
logAndThrowWriterAppendFailure(logEntry, e);
}
}
// Pass along summary statistics
wap.incrementEdits(editsCount);
wap.incrementNanoTime(System.nanoTime() - startTime);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.fatal(" Got while writing log entry to log", e);
throw e;
}
return wap;
}
private void logAndThrowWriterAppendFailure(Entry logEntry, IOException e) throws IOException {
e = RemoteExceptionHandler.checkIOException(e);
final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
LOG.fatal(errorMsg, e);
updateStatusWithMsg(errorMsg);
throw e;
}
@Override
public boolean keepRegionEvent(Entry entry) {
ArrayList<Cell> cells = entry.getEdit().getCells();