HBASE-24380 : Provide WAL splitting journal logging (#1860)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
This commit is contained in:
parent
30a6c63827
commit
48fda91c5b
|
@ -57,7 +57,7 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
|
||||||
* @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
|
* @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
|
||||||
*/
|
*/
|
||||||
protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
|
protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
|
||||||
long seqId) throws IOException {
|
long seqId) throws IOException {
|
||||||
Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
|
Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
|
||||||
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
|
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
|
||||||
walSplitter.conf);
|
walSplitter.conf);
|
||||||
|
@ -70,27 +70,35 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
|
WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
|
||||||
LOG.info("Creating recovered edits writer path={}", regionEditsPath);
|
final String msg = "Creating recovered edits writer path=" + regionEditsPath;
|
||||||
|
LOG.info(msg);
|
||||||
|
updateStatusWithMsg(msg);
|
||||||
return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
|
return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
|
protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
|
||||||
List<IOException> thrown) throws IOException {
|
List<IOException> thrown) throws IOException {
|
||||||
try {
|
try {
|
||||||
editsWriter.writer.close();
|
editsWriter.writer.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe);
|
final String errorMsg = "Could not close recovered edits at " + editsWriter.path;
|
||||||
|
LOG.error(errorMsg, ioe);
|
||||||
|
updateStatusWithMsg(errorMsg);
|
||||||
thrown.add(ioe);
|
thrown.add(ioe);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms",
|
final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote "
|
||||||
editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped,
|
+ editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + (
|
||||||
editsWriter.nanosSpent / 1000 / 1000);
|
editsWriter.nanosSpent / 1000 / 1000) + " ms)";
|
||||||
|
LOG.info(msg);
|
||||||
|
updateStatusWithMsg(msg);
|
||||||
if (editsWriter.editsWritten == 0) {
|
if (editsWriter.editsWritten == 0) {
|
||||||
// just remove the empty recovered.edits file
|
// just remove the empty recovered.edits file
|
||||||
if (walSplitter.walFS.exists(editsWriter.path) &&
|
if (walSplitter.walFS.exists(editsWriter.path)
|
||||||
!walSplitter.walFS.delete(editsWriter.path, false)) {
|
&& !walSplitter.walFS.delete(editsWriter.path, false)) {
|
||||||
LOG.warn("Failed deleting empty {}", editsWriter.path);
|
final String errorMsg = "Failed deleting empty " + editsWriter.path;
|
||||||
|
LOG.warn(errorMsg);
|
||||||
|
updateStatusWithMsg(errorMsg);
|
||||||
throw new IOException("Failed deleting empty " + editsWriter.path);
|
throw new IOException("Failed deleting empty " + editsWriter.path);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -107,13 +115,20 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
|
||||||
// TestHLogSplit#testThreading is an example.
|
// TestHLogSplit#testThreading is an example.
|
||||||
if (walSplitter.walFS.exists(editsWriter.path)) {
|
if (walSplitter.walFS.exists(editsWriter.path)) {
|
||||||
if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
|
if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
|
||||||
throw new IOException(
|
final String errorMsg =
|
||||||
"Failed renaming recovered edits " + editsWriter.path + " to " + dst);
|
"Failed renaming recovered edits " + editsWriter.path + " to " + dst;
|
||||||
|
updateStatusWithMsg(errorMsg);
|
||||||
|
throw new IOException(errorMsg);
|
||||||
}
|
}
|
||||||
LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst);
|
final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst;
|
||||||
|
LOG.info(renameEditMsg);
|
||||||
|
updateStatusWithMsg(renameEditMsg);
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe);
|
final String errorMsg = "Could not rename recovered edits " + editsWriter.path
|
||||||
|
+ " to " + dst;
|
||||||
|
LOG.error(errorMsg, ioe);
|
||||||
|
updateStatusWithMsg(errorMsg);
|
||||||
thrown.add(ioe);
|
thrown.add(ioe);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -216,26 +231,33 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
|
||||||
|
|
||||||
void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
|
void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
|
||||||
long startTime = System.nanoTime();
|
long startTime = System.nanoTime();
|
||||||
try {
|
int editsCount = 0;
|
||||||
int editsCount = 0;
|
for (WAL.Entry logEntry : entries) {
|
||||||
for (WAL.Entry logEntry : entries) {
|
filterCellByStore(logEntry);
|
||||||
filterCellByStore(logEntry);
|
if (!logEntry.getEdit().isEmpty()) {
|
||||||
if (!logEntry.getEdit().isEmpty()) {
|
try {
|
||||||
writer.append(logEntry);
|
writer.append(logEntry);
|
||||||
updateRegionMaximumEditLogSeqNum(logEntry);
|
} catch (IOException e) {
|
||||||
editsCount++;
|
logAndThrowWriterAppendFailure(logEntry, e);
|
||||||
} else {
|
|
||||||
incrementSkippedEdits(1);
|
|
||||||
}
|
}
|
||||||
|
updateRegionMaximumEditLogSeqNum(logEntry);
|
||||||
|
editsCount++;
|
||||||
|
} else {
|
||||||
|
incrementSkippedEdits(1);
|
||||||
}
|
}
|
||||||
// Pass along summary statistics
|
|
||||||
incrementEdits(editsCount);
|
|
||||||
incrementNanoTime(System.nanoTime() - startTime);
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
|
||||||
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
|
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
|
// Pass along summary statistics
|
||||||
|
incrementEdits(editsCount);
|
||||||
|
incrementNanoTime(System.nanoTime() - startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e)
|
||||||
|
throws IOException {
|
||||||
|
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
||||||
|
final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
|
||||||
|
LOG.error(HBaseMarkers.FATAL, errorMsg, e);
|
||||||
|
updateStatusWithMsg(errorMsg);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void filterCellByStore(WAL.Entry logEntry) {
|
private void filterCellByStore(WAL.Entry logEntry) {
|
||||||
|
|
|
@ -86,7 +86,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
||||||
public List<Path> close() throws IOException {
|
public List<Path> close() throws IOException {
|
||||||
boolean isSuccessful = true;
|
boolean isSuccessful = true;
|
||||||
try {
|
try {
|
||||||
isSuccessful &= finishWriterThreads(false);
|
isSuccessful = finishWriterThreads(false);
|
||||||
} finally {
|
} finally {
|
||||||
isSuccessful &= writeRemainingEntryBuffers();
|
isSuccessful &= writeRemainingEntryBuffers();
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
||||||
public List<Path> close() throws IOException {
|
public List<Path> close() throws IOException {
|
||||||
boolean isSuccessful = true;
|
boolean isSuccessful = true;
|
||||||
try {
|
try {
|
||||||
isSuccessful &= finishWriterThreads(false);
|
isSuccessful = finishWriterThreads(false);
|
||||||
} finally {
|
} finally {
|
||||||
isSuccessful &= writeRemainingEntryBuffers();
|
isSuccessful &= writeRemainingEntryBuffers();
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -61,6 +62,8 @@ public abstract class OutputSink {
|
||||||
*/
|
*/
|
||||||
protected final List<Path> splits = new ArrayList<>();
|
protected final List<Path> splits = new ArrayList<>();
|
||||||
|
|
||||||
|
protected MonitoredTask status = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used when close this output sink.
|
* Used when close this output sink.
|
||||||
*/
|
*/
|
||||||
|
@ -81,6 +84,10 @@ public abstract class OutputSink {
|
||||||
this.reporter = reporter;
|
this.reporter = reporter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setStatus(MonitoredTask status) {
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the threads that will pump data from the entryBuffers to the output files.
|
* Start the threads that will pump data from the entryBuffers to the output files.
|
||||||
*/
|
*/
|
||||||
|
@ -135,7 +142,9 @@ public abstract class OutputSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
controller.checkForErrors();
|
controller.checkForErrors();
|
||||||
LOG.info("{} split writer threads finished", this.writerThreads.size());
|
final String msg = this.writerThreads.size() + " split writer threads finished";
|
||||||
|
LOG.info(msg);
|
||||||
|
updateStatusWithMsg(msg);
|
||||||
return (!progressFailed);
|
return (!progressFailed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +159,7 @@ public abstract class OutputSink {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param buffer A buffer of some number of edits for a given region.
|
* @param buffer A buffer of some number of edits for a given region.
|
||||||
|
* @throws IOException For any IO errors
|
||||||
*/
|
*/
|
||||||
protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
|
protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
|
||||||
|
|
||||||
|
@ -172,6 +182,16 @@ public abstract class OutputSink {
|
||||||
*/
|
*/
|
||||||
protected abstract boolean keepRegionEvent(WAL.Entry entry);
|
protected abstract boolean keepRegionEvent(WAL.Entry entry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set status message in {@link MonitoredTask} instance that is set in this OutputSink
|
||||||
|
* @param msg message to update the status with
|
||||||
|
*/
|
||||||
|
protected final void updateStatusWithMsg(String msg) {
|
||||||
|
if (status != null) {
|
||||||
|
status.setStatus(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class WriterThread extends Thread {
|
public static class WriterThread extends Thread {
|
||||||
private volatile boolean shouldStop = false;
|
private volatile boolean shouldStop = false;
|
||||||
private WALSplitter.PipelineController controller;
|
private WALSplitter.PipelineController controller;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -74,7 +75,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
||||||
* @return null if this region shouldn't output any logs
|
* @return null if this region shouldn't output any logs
|
||||||
*/
|
*/
|
||||||
private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
|
private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
|
||||||
long seqId) throws IOException {
|
long seqId) throws IOException {
|
||||||
RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
|
RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
|
||||||
if (ret != null) {
|
if (ret != null) {
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -92,7 +93,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
||||||
public List<Path> close() throws IOException {
|
public List<Path> close() throws IOException {
|
||||||
boolean isSuccessful = true;
|
boolean isSuccessful = true;
|
||||||
try {
|
try {
|
||||||
isSuccessful &= finishWriterThreads(false);
|
isSuccessful = finishWriterThreads(false);
|
||||||
} finally {
|
} finally {
|
||||||
isSuccessful &= closeWriters();
|
isSuccessful &= closeWriters();
|
||||||
}
|
}
|
||||||
|
|
|
@ -269,6 +269,7 @@ public class WALSplitter {
|
||||||
|
|
||||||
status = TaskMonitor.get().createStatus(
|
status = TaskMonitor.get().createStatus(
|
||||||
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
|
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
|
||||||
|
status.enableStatusJournal(true);
|
||||||
Reader logFileReader = null;
|
Reader logFileReader = null;
|
||||||
this.fileBeingSplit = logfile;
|
this.fileBeingSplit = logfile;
|
||||||
long startTS = EnvironmentEdgeManager.currentTime();
|
long startTS = EnvironmentEdgeManager.currentTime();
|
||||||
|
@ -276,7 +277,7 @@ public class WALSplitter {
|
||||||
long logLength = logfile.getLen();
|
long logLength = logfile.getLen();
|
||||||
LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
|
LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
|
||||||
logLength);
|
logLength);
|
||||||
status.setStatus("Opening log file");
|
status.setStatus("Opening log file " + logPath);
|
||||||
if (reporter != null && !reporter.progress()) {
|
if (reporter != null && !reporter.progress()) {
|
||||||
progressFailed = true;
|
progressFailed = true;
|
||||||
return false;
|
return false;
|
||||||
|
@ -291,6 +292,7 @@ public class WALSplitter {
|
||||||
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
|
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
|
||||||
int numOpenedFilesLastCheck = 0;
|
int numOpenedFilesLastCheck = 0;
|
||||||
outputSink.setReporter(reporter);
|
outputSink.setReporter(reporter);
|
||||||
|
outputSink.setStatus(status);
|
||||||
outputSink.startWriterThreads();
|
outputSink.startWriterThreads();
|
||||||
outputSinkStarted = true;
|
outputSinkStarted = true;
|
||||||
Entry entry;
|
Entry entry;
|
||||||
|
@ -375,7 +377,9 @@ public class WALSplitter {
|
||||||
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
LOG.debug("Finishing writing output logs and closing down");
|
final String log = "Finishing writing output logs and closing down";
|
||||||
|
LOG.debug(log);
|
||||||
|
status.setStatus(log);
|
||||||
try {
|
try {
|
||||||
if (null != logFileReader) {
|
if (null != logFileReader) {
|
||||||
logFileReader.close();
|
logFileReader.close();
|
||||||
|
@ -400,6 +404,10 @@ public class WALSplitter {
|
||||||
", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
|
", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
status.markComplete(msg);
|
status.markComplete(msg);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
|
||||||
|
status.prettyPrintJournal());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return !progressFailed;
|
return !progressFailed;
|
||||||
|
|
Loading…
Reference in New Issue