HBASE-8321 Log split worker should heartbeat to avoid timeout when the hlog is under recovery

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1468974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-04-17 15:34:11 +00:00
parent a80c7a7e95
commit 24c6f662c2
10 changed files with 149 additions and 128 deletions

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -81,13 +82,16 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
private volatile boolean exitWorker; private volatile boolean exitWorker;
private final Object grabTaskLock = new Object(); private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false; private boolean workerInGrabTask = false;
private final int report_period;
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
ServerName serverName, TaskExecutor splitTaskExecutor) { ServerName serverName, TaskExecutor splitTaskExecutor) {
super(watcher); super(watcher);
this.serverName = serverName; this.serverName = serverName;
this.splitTaskExecutor = splitTaskExecutor; this.splitTaskExecutor = splitTaskExecutor;
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout",
SplitLogManager.DEFAULT_TIMEOUT) / 2);
} }
public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
@ -274,15 +278,22 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask), status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
new CancelableProgressable() { new CancelableProgressable() {
private long last_report_at = 0;
@Override @Override
public boolean progress() { public boolean progress() {
if (!attemptToOwnTask(false)) { long t = EnvironmentEdgeManager.currentTimeMillis();
LOG.warn("Failed to heartbeat the task" + currentTask); if ((t - last_report_at) > report_period) {
return false; last_report_at = t;
if (!attemptToOwnTask(false)) {
LOG.warn("Failed to heartbeat the task" + currentTask);
return false;
}
} }
return true; return true;
} }
}); });
switch (status) { switch (status) {
case DONE: case DONE:
endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done); endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class HLogFactory { public class HLogFactory {
@ -70,6 +71,11 @@ public class HLogFactory {
logReaderClass = null; logReaderClass = null;
} }
public static HLog.Reader createReader(final FileSystem fs,
final Path path, Configuration conf) throws IOException {
return createReader(fs, path, conf, null);
}
/** /**
* Create a reader for the WAL. If you are reading from a file that's being written to * Create a reader for the WAL. If you are reading from a file that's being written to
* and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
@ -77,8 +83,8 @@ public class HLogFactory {
* @return A WAL reader. Close when done with it. * @return A WAL reader. Close when done with it.
* @throws IOException * @throws IOException
*/ */
public static HLog.Reader createReader(final FileSystem fs, public static HLog.Reader createReader(final FileSystem fs, final Path path,
final Path path, Configuration conf) throws IOException { Configuration conf, CancelableProgressable reporter) throws IOException {
if (logReaderClass == null) { if (logReaderClass == null) {
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class); SequenceFileLogReader.class, Reader.class);
@ -102,6 +108,9 @@ public class HLogFactory {
if (++nbAttempt == 1) { if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e); LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
} }
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) { if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
LOG.error("Can't open after " + nbAttempt + " attempts and " LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTimeMillis() - startWaiting) + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting)

View File

@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
@ -113,9 +112,6 @@ public class HLogSplitter {
private MonitoredTask status; private MonitoredTask status;
// Used in distributed log splitting
private DistributedLogSplittingHelper distributedLogSplittingHelper = null;
// For checking the latest flushed sequence id // For checking the latest flushed sequence id
protected final LastSequenceId sequenceIdChecker; protected final LastSequenceId sequenceIdChecker;
@ -263,10 +259,6 @@ public class HLogSplitter {
return outputSink.getOutputCounts(); return outputSink.getOutputCounts();
} }
void setDistributedLogSplittingHelper(DistributedLogSplittingHelper helper) {
this.distributedLogSplittingHelper = helper;
}
/** /**
* Splits the HLog edits in the given list of logfiles (that are a mix of edits * Splits the HLog edits in the given list of logfiles (that are a mix of edits
* on multiple regions) by region and then splits them per region directories, * on multiple regions) by region and then splits them per region directories,
@ -317,7 +309,7 @@ public class HLogSplitter {
//meta only. However, there is a sequence number that can be obtained //meta only. However, there is a sequence number that can be obtained
//only by parsing.. so we parse for all files currently //only by parsing.. so we parse for all files currently
//TODO: optimize this part somehow //TODO: optimize this part somehow
in = getReader(fs, log, conf, skipErrors); in = getReader(fs, log, conf, skipErrors, null);
if (in != null) { if (in != null) {
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors); parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
} }
@ -420,53 +412,58 @@ public class HLogSplitter {
CancelableProgressable reporter) throws IOException { CancelableProgressable reporter) throws IOException {
boolean isCorrupted = false; boolean isCorrupted = false;
Preconditions.checkState(status == null); Preconditions.checkState(status == null);
status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() +
"into a temporary staging area.");
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
HLog.SPLIT_SKIP_ERRORS_DEFAULT); HLog.SPLIT_SKIP_ERRORS_DEFAULT);
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
Path logPath = logfile.getPath(); Path logPath = logfile.getPath();
long logLength = logfile.getLen(); boolean outputSinkStarted = false;
LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
status.setStatus("Opening log file");
Reader in = null;
try {
in = getReader(fs, logfile, conf, skipErrors);
} catch (CorruptedLogFileException e) {
LOG.warn("Could not get reader, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
}
if (in == null) {
status.markComplete("Was nothing to split in log file");
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter));
if (!reportProgressIfIsDistributedLogSplitting()) {
return false;
}
boolean progress_failed = false; boolean progress_failed = false;
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.startWriterThreads();
// Report progress every so many edits and/or files opened (opening a file
// takes a bit of time).
Map<byte[], Long> lastFlushedSequenceIds =
new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
Entry entry;
int editsCount = 0; int editsCount = 0;
int editsSkipped = 0; int editsSkipped = 0;
try { try {
status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() +
"into a temporary staging area.");
long logLength = logfile.getLen();
LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
status.setStatus("Opening log file");
if (reporter != null && !reporter.progress()) {
progress_failed = true;
return false;
}
Reader in = null;
try {
in = getReader(fs, logfile, conf, skipErrors, reporter);
} catch (CorruptedLogFileException e) {
LOG.warn("Could not get reader, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
}
if (in == null) {
status.markComplete("Was nothing to split in log file");
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.setReporter(reporter);
outputSink.startWriterThreads();
outputSinkStarted = true;
// Report progress every so many edits and/or files opened (opening a file
// takes a bit of time).
Map<byte[], Long> lastFlushedSequenceIds =
new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
Entry entry;
while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) { while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName(); byte[] region = entry.getKey().getEncodedRegionName();
Long lastFlushedSequenceId = -1l; Long lastFlushedSequenceId = -1l;
if (sequenceIdChecker != null) { if (sequenceIdChecker != null) {
lastFlushedSequenceId = lastFlushedSequenceIds.get(region); lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
if (lastFlushedSequenceId == null) { if (lastFlushedSequenceId == null) {
lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region); lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
lastFlushedSequenceIds.put(region, lastFlushedSequenceId); lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
} }
} }
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
@ -482,7 +479,8 @@ public class HLogSplitter {
String countsStr = (editsCount - editsSkipped) + String countsStr = (editsCount - editsSkipped) +
" edits, skipped " + editsSkipped + " edits."; " edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr); status.setStatus("Split " + countsStr);
if (!reportProgressIfIsDistributedLogSplitting()) { if (reporter != null && !reporter.progress()) {
progress_failed = true;
return false; return false;
} }
} }
@ -500,12 +498,13 @@ public class HLogSplitter {
throw e; throw e;
} finally { } finally {
LOG.info("Finishing writing output logs and closing down."); LOG.info("Finishing writing output logs and closing down.");
progress_failed = outputSink.finishWritingAndClose() == null; if (outputSinkStarted) {
progress_failed = outputSink.finishWritingAndClose() == null;
}
String msg = "Processed " + editsCount + " edits across " String msg = "Processed " + editsCount + " edits across "
+ outputSink.getOutputCounts().size() + " regions; log file=" + outputSink.getOutputCounts().size() + " regions; log file="
+ logPath + " is corrupted = " + isCorrupted + " progress failed = " + logPath + " is corrupted = " + isCorrupted + " progress failed = "
+ progress_failed; + progress_failed;
;
LOG.info(msg); LOG.info(msg);
status.markComplete(msg); status.markComplete(msg);
} }
@ -620,6 +619,7 @@ public class HLogSplitter {
* @return Path to file into which to dump split log edits. * @return Path to file into which to dump split log edits.
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("deprecation")
static Path getRegionSplitEditsPath(final FileSystem fs, static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir, boolean isCreate) final Entry logEntry, final Path rootDir, boolean isCreate)
throws IOException { throws IOException {
@ -724,7 +724,7 @@ public class HLogSplitter {
* @throws CorruptedLogFileException * @throws CorruptedLogFileException
*/ */
protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf, protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
boolean skipErrors) boolean skipErrors, CancelableProgressable reporter)
throws IOException, CorruptedLogFileException { throws IOException, CorruptedLogFileException {
Path path = file.getPath(); Path path = file.getPath();
long length = file.getLen(); long length = file.getLen();
@ -739,9 +739,9 @@ public class HLogSplitter {
} }
try { try {
FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf); FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
try { try {
in = getReader(fs, path, conf); in = getReader(fs, path, conf, reporter);
} catch (EOFException e) { } catch (EOFException e) {
if (length <= 0) { if (length <= 0) {
// TODO should we ignore an empty, not-last log file if skip.errors // TODO should we ignore an empty, not-last log file if skip.errors
@ -757,8 +757,8 @@ public class HLogSplitter {
} }
} }
} catch (IOException e) { } catch (IOException e) {
if (!skipErrors) { if (!skipErrors || e instanceof InterruptedIOException) {
throw e; throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
} }
CorruptedLogFileException t = CorruptedLogFileException t =
new CorruptedLogFileException("skipErrors=true Could not open hlog " + new CorruptedLogFileException("skipErrors=true Could not open hlog " +
@ -826,9 +826,9 @@ public class HLogSplitter {
/** /**
* Create a new {@link Reader} for reading logs to split. * Create a new {@link Reader} for reading logs to split.
*/ */
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) protected Reader getReader(FileSystem fs, Path curLogFile,
throws IOException { Configuration conf, CancelableProgressable reporter) throws IOException {
return HLogFactory.createReader(fs, curLogFile, conf); return HLogFactory.createReader(fs, curLogFile, conf, reporter);
} }
/** /**
@ -1108,56 +1108,6 @@ public class HLogSplitter {
return ret; return ret;
} }
/***
* @return false if it is a distributed log splitting and it failed to report
* progress
*/
private boolean reportProgressIfIsDistributedLogSplitting() {
if (this.distributedLogSplittingHelper != null) {
return distributedLogSplittingHelper.reportProgress();
} else {
return true;
}
}
/**
* A class used in distributed log splitting
*
*/
class DistributedLogSplittingHelper {
// Report progress, only used in distributed log splitting
private final CancelableProgressable splitReporter;
// How often to send a progress report (default 1/2 master timeout)
private final int report_period;
private long last_report_at = 0;
public DistributedLogSplittingHelper(CancelableProgressable reporter) {
this.splitReporter = reporter;
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout",
SplitLogManager.DEFAULT_TIMEOUT) / 2);
}
/***
* @return false if reporter failed progressing
*/
private boolean reportProgress() {
if (splitReporter == null) {
return true;
} else {
long t = EnvironmentEdgeManager.currentTimeMillis();
if ((t - last_report_at) > report_period) {
last_report_at = t;
if (this.splitReporter.progress() == false) {
LOG.warn("Failed: reporter.progress asked us to terminate");
return false;
}
}
return true;
}
}
}
/** /**
* Class that manages the output streams from the log splitting process. * Class that manages the output streams from the log splitting process.
*/ */
@ -1178,6 +1128,8 @@ public class HLogSplitter {
private final int numThreads; private final int numThreads;
private CancelableProgressable reporter = null;
public OutputSink() { public OutputSink() {
// More threads could potentially write faster at the expense // More threads could potentially write faster at the expense
// of causing more disk seeks as the logs are split. // of causing more disk seeks as the logs are split.
@ -1188,6 +1140,10 @@ public class HLogSplitter {
"hbase.regionserver.hlog.splitlog.writer.threads", 3); "hbase.regionserver.hlog.splitlog.writer.threads", 3);
} }
void setReporter(CancelableProgressable reporter) {
this.reporter = reporter;
}
/** /**
* Start the threads that will pump data from the entryBuffers * Start the threads that will pump data from the entryBuffers
* to the output files. * to the output files.
@ -1213,7 +1169,7 @@ public class HLogSplitter {
t.finish(); t.finish();
} }
for (WriterThread t : writerThreads) { for (WriterThread t : writerThreads) {
if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true; progress_failed = true;
} }
try { try {
@ -1309,7 +1265,7 @@ public class HLogSplitter {
for (int i = 0, n = logWriters.size(); i < n; i++) { for (int i = 0, n = logWriters.size(); i < n; i++) {
Future<Void> future = completionService.take(); Future<Void> future = completionService.take();
future.get(); future.get();
if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true; progress_failed = true;
} }
} }
@ -1437,8 +1393,6 @@ public class HLogSplitter {
} }
} }
/** /**
* Private data structure that wraps a Writer and its Path, * Private data structure that wraps a Writer and its Path,
* also collecting statistics about the data written to this * also collecting statistics about the data written to this

View File

@ -38,15 +38,15 @@ import java.io.InterruptedIOException;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class FSHDFSUtils extends FSUtils{ public class FSHDFSUtils extends FSUtils {
private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class); private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
/** /**
* Recover the lease from HDFS, retrying multiple times. * Recover the lease from HDFS, retrying multiple times.
*/ */
@Override @Override
public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf) public void recoverFileLease(final FileSystem fs, final Path p,
throws IOException { Configuration conf, CancelableProgressable reporter) throws IOException {
// lease recovery not needed for local file system case. // lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) { if (!(fs instanceof DistributedFileSystem)) {
return; return;
@ -81,6 +81,9 @@ public class FSHDFSUtils extends FSUtils{
", retrying.", e); ", retrying.", e);
} }
if (!recovered) { if (!recovered) {
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
// try at least twice. // try at least twice.
if (nbAttempt > 2 && recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) { if (nbAttempt > 2 && recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
LOG.error("Can't recoverLease after " + nbAttempt + " attempts and " + LOG.error("Can't recoverLease after " + nbAttempt + " attempts and " +

View File

@ -36,7 +36,7 @@ public class FSMapRUtils extends FSUtils {
private static final Log LOG = LogFactory.getLog(FSMapRUtils.class); private static final Log LOG = LogFactory.getLog(FSMapRUtils.class);
public void recoverFileLease(final FileSystem fs, final Path p, public void recoverFileLease(final FileSystem fs, final Path p,
Configuration conf) throws IOException { Configuration conf, CancelableProgressable reporter) throws IOException {
LOG.info("Recovering file " + p.toString() + LOG.info("Recovering file " + p.toString() +
" by changing permission to readonly"); " by changing permission to readonly");
FsPermission roPerm = new FsPermission((short) 0444); FsPermission roPerm = new FsPermission((short) 0444);

View File

@ -206,6 +206,7 @@ public abstract class FSUtils {
* @return output stream to the created file * @return output stream to the created file
* @throws IOException if the file cannot be created * @throws IOException if the file cannot be created
*/ */
@SuppressWarnings("deprecation")
public static FSDataOutputStream create(FileSystem fs, Path path, public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException { FsPermission perm, boolean overwrite) throws IOException {
LOG.debug("Creating file=" + path + " with permission=" + perm); LOG.debug("Creating file=" + path + " with permission=" + perm);
@ -763,6 +764,7 @@ public abstract class FSUtils {
* @return true if exists * @return true if exists
* @throws IOException e * @throws IOException e
*/ */
@SuppressWarnings("deprecation")
public static boolean metaRegionExists(FileSystem fs, Path rootdir) public static boolean metaRegionExists(FileSystem fs, Path rootdir)
throws IOException { throws IOException {
Path rootRegionDir = Path rootRegionDir =
@ -1147,7 +1149,7 @@ public abstract class FSUtils {
* @throws IOException * @throws IOException
*/ */
public abstract void recoverFileLease(final FileSystem fs, final Path p, public abstract void recoverFileLease(final FileSystem fs, final Path p,
Configuration conf) throws IOException; Configuration conf, CancelableProgressable reporter) throws IOException;
/** /**
* @param fs * @param fs

View File

@ -753,7 +753,7 @@ public class RegionSplitter {
} else { } else {
LOG.debug("_balancedSplit file found. Replay log to restore state..."); LOG.debug("_balancedSplit file found. Replay log to restore state...");
FSUtils.getInstance(fs, table.getConfiguration()) FSUtils.getInstance(fs, table.getConfiguration())
.recoverFileLease(fs, splitFile, table.getConfiguration()); .recoverFileLease(fs, splitFile, table.getConfiguration(), null);
// parse split file and process remaining splits // parse split file and process remaining splits
FSDataInputStream tmpIn = fs.open(splitFile); FSDataInputStream tmpIn = fs.open(splitFile);

View File

@ -159,6 +159,7 @@ public class TestDistributedLogSplitting {
for (HRegionInfo hri : regions) { for (HRegionInfo hri : regions) {
Path tdir = HTableDescriptor.getTableDir(rootdir, table); Path tdir = HTableDescriptor.getTableDir(rootdir, table);
@SuppressWarnings("deprecation")
Path editsdir = Path editsdir =
HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir); LOG.debug("checking edits dir " + editsdir);

View File

@ -459,7 +459,7 @@ public class TestHLog {
public void run() { public void run() {
try { try {
FSUtils.getInstance(fs, rlConf) FSUtils.getInstance(fs, rlConf)
.recoverFileLease(recoveredFs, walPath, rlConf); .recoverFileLease(recoveredFs, walPath, rlConf, null);
} catch (IOException e) { } catch (IOException e) {
exception = e; exception = e;
} }

View File

@ -36,6 +36,7 @@ import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -918,6 +920,45 @@ public class TestHLogSplit {
} }
} }
@Test
public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
generateHLogs(1, 10, -1);
FileStatus logfile = fs.listStatus(HLOGDIR)[0];
fs.initialize(fs.getUri(), conf);
final AtomicInteger count = new AtomicInteger();
CancelableProgressable localReporter
= new CancelableProgressable() {
@Override
public boolean progress() {
count.getAndIncrement();
return false;
}
};
FileSystem spiedFs = Mockito.spy(fs);
Mockito.doAnswer(new Answer<FSDataInputStream>() {
public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(1500); // Sleep a while and wait report status invoked
return (FSDataInputStream)invocation.callRealMethod();
}
}).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
try {
conf.setInt("hbase.splitlog.report.period", 1000);
HLogSplitter s = new HLogSplitter(conf, HBASEDIR, null, null, spiedFs, null);
boolean ret = s.splitLogFile(logfile, localReporter);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
fail("There shouldn't be any exception but: " + e.toString());
} finally {
// reset it back to its default value
conf.setInt("hbase.splitlog.report.period", 59000);
}
}
/** /**
* Test log split process with fake data and lots of edits to trigger threading * Test log split process with fake data and lots of edits to trigger threading
* issues. * issues.
@ -1000,8 +1041,8 @@ public class TestHLogSplit {
/* Produce a mock reader that generates fake entries */ /* Produce a mock reader that generates fake entries */
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) protected Reader getReader(FileSystem fs, Path curLogFile,
throws IOException { Configuration conf, CancelableProgressable reporter) throws IOException {
Reader mockReader = Mockito.mock(Reader.class); Reader mockReader = Mockito.mock(Reader.class);
Mockito.doAnswer(new Answer<HLog.Entry>() { Mockito.doAnswer(new Answer<HLog.Entry>() {
int index = 0; int index = 0;